diff --git a/.gitignore b/.gitignore index 5b57e2dd..5e46985a 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,10 @@ docs/public docs/resources/_gen/ docs/.hugo_build.lock test/integration/**/clab-* + + +# Only for development and testing purposes +# To be removed after development of targetsource +# ignored in order to not add unnecassary logging messages +lab/dev/resources/targetsources +.scannerwork/ \ No newline at end of file diff --git a/Makefile b/Makefile index fdcc2b24..e8ddee2b 100644 --- a/Makefile +++ b/Makefile @@ -308,9 +308,10 @@ delete-targetsources-dev-lab: ## Delete the target sources for the development l ##@ Testing Lab .PHONY: run-integration-tests -run-integration-tests: docker-build undeploy-test-cluster deploy-test-cluster install-test-cluster-dependencies load-test-image deploy install-kubectl install-gnmic install-containerlab deploy-test-topology apply-test-resources +run-integration-tests: docker-build undeploy-test-cluster deploy-test-cluster install-test-cluster-dependencies load-test-image deploy install-kubectl install-gnmic install-containerlab deploy-test-topology deploy-test-http-server apply-test-resources kubectl wait --for=condition=Ready cluster --all --timeout=180s kubectl wait --for=condition=Ready pipeline --all --timeout=180s + kubectl wait --for=condition=Ready targetsource --all --timeout=180s kubectl wait --for=jsonpath='{.status.connectionState}'=READY target --all --timeout=180s kubectl get subscriptions -o yaml kubectl get outputs -o yaml diff --git a/api/v1alpha1/targetsource_types.go b/api/v1alpha1/targetsource_types.go index 26a106f5..ba9f9d92 100644 --- a/api/v1alpha1/targetsource_types.go +++ b/api/v1alpha1/targetsource_types.go @@ -58,18 +58,64 @@ type HTTPConfig struct { // +kubebuilder:validation:Optional URL string `json:"url,omitempty"` + // HTTP method used for the request. + // + // Defaults to GET if not specified. + // + // Supported values: + // - GET (default, no request body) + // - POST (supports request body) + // + // +kubebuilder:validation:Enum=GET;POST + // +kubebuilder:default="GET" + // +kubebuilder:validation:Optional + Method string `json:"method,omitempty"` + + // Optional HTTP headers to include in the request. + // + // These map directly to HTTP headers (key-value pairs). + // + // Example: + // headers: + // Content-Type: application/json + // X-Custom-Header: value + // + // Precedence: + // - Authentication configuration overrides any conflicting headers e.g. Authorization + // + // +kubebuilder:validation:Optional + Headers map[string]string `json:"headers,omitempty"` + + // Optional raw request body. + // + // Typically used with POST requests and contains JSON payload. + // + // Example: + // body: | + // { + // "limit": 100, + // "status": "active" + // } + // + // Notes: + // - Ignored for GET requests + // - User must set appropriate Content-Type header if needed + // + // +kubebuilder:validation:Optional + Body string `json:"body,omitempty"` + // Optional authentication configuration for accessing the HTTP endpoint // +kubebuilder:validation:Optional Authentication *AuthenticationSpec `json:"authentication,omitempty"` // Optional interval for polling the HTTP endpoint for targets // TODO: document about default value - // +kubebuilder:default="6h" + // +kubebuilder:default="30m" // +kubebuilder:validation:Optional Interval *metav1.Duration `json:"interval,omitempty"` // Optional timeout for HTTP requests to the endpoint - // +kubebuilder:default="10s" + // +kubebuilder:default="30s" // +kubebuilder:validation:Optional Timeout *metav1.Duration `json:"timeout,omitempty"` @@ -132,51 +178,186 @@ type TokenAuthSpec struct { TokenSecretRef *corev1.SecretKeySelector `json:"tokenSecretRef,omitempty"` } -// PaginationSpec defines the configuration for paginating through responses from providers +// PaginationSpec defines how pagination is handled for HTTP APIs. +// +// The pagination mechanism is fully server-driven. The loader will repeatedly: +// 1. Extract the "next" reference from the response +// 2. Use it to construct the next request +// 3. Continue until no next reference is returned +// +// Supported pagination styles: +// 1. Cursor-based: +// - Response returns a token (e.g. "next_page_token") +// - Client sends it back via a query parameter (e.g. "page_token") +// 2. URL-based (nextLink): +// - Response returns a full URL +// - Client follows it directly without modification +// 3. Expression-based extraction: +// - The next reference is extracted using a CEL expression +// - This allows access to nested fields or special keys +// (e.g. "@odata.nextLink") +// +// Behavior: +// - If the extracted value is a full URL, it will be used as-is +// - Otherwise, it is treated as a token and appended using RequestParam +// - The token is treated as opaque and must not be interpreted +// +// Example: +// +// pagination: +// nextField: "self.next_page_token" +// requestParam: "page_token" +// +// pagination: +// nextField: "self['@odata.nextLink']" type PaginationSpec struct { - // Field name in the JSON response that contains the next page reference. - // The value can be either: - // - a full URL (used directly for the next request), or - // - a pagination token (appended as a query parameter using this field name as the key). + // CEL expression used to extract the next page reference from the response. + // + // The expression is evaluated with: + // self -> full JSON response // - // Must refer to a top-level key in the response object. - // Example: "next" or "nextToken" + // It must evaluate to either: + // - string (full URL OR token), or + // - null (indicates end of pagination) + // + // Examples: + // "self.next" + // "self.next_page_token" + // "self['@odata.nextLink']" + // + // +kubebuilder:validation:Optional NextField string `json:"nextField,omitempty"` + + // Query parameter name used when the extracted value is a token. + // + // Required for token-based pagination. + // Ignored when NextField resolves to a full URL. + // + // Example: + // requestParam: "page_token" + // + // +kubebuilder:validation:Optional + RequestParam string `json:"requestParam,omitempty"` } -// CEL expressions to extract target fields from the response -// and map them to the corresponding Target fields. +// ResponseMappingSpec controls how targets are extracted from an HTTP JSON response. +// +// This allows you to map fields from a JSON API into targets using either: +// - simple direct field access (e.g. item["name"]) +// - or CEL expressions for more advanced logic +// +// General behavior: +// +// 1. Selecting targets: +// - `targetsField` is a CEL expression that selects the list of targets +// - It runs once on the full response (`self`) and MUST return a list +// - If not set, the response itself must be a JSON array +// +// 2. Extracting fields: +// - Each field (name, address, port, labels, etc.) is handled independently +// - If a CEL expression is provided → it is evaluated +// - If not provided → the value is read directly from the target object +// +// 3. Available variables in CEL: +// - item -> the current target object +// - self -> the full HTTP response JSON +// +// Example: +// +// Response: +// { +// "results": [ +// { "name": "device1", "ip": "10.0.0.1", "env": "prod" } +// ], +// "meta": { "region": "eu-west" } +// } +// +// Mapping: +// targetsField: "self.results" +// +// name: "" # direct → item["name"] +// address: "item.ip" # CEL +// +// labels: +// env: "item.env" +// region: "self.meta.region" type ResponseMappingSpec struct { - // Field name in the JSON response that contains the list of items (targets). - // If not specified, the entire response is expected to be a list of items. - // All subsequent fields are specified relative to this field - // Example: "results" if the response is of the form {"results": [ ... list of items ... ]} + // CEL expression that selects the list of target objects from the response. + // + // This is evaluated once using: + // self -> full JSON response + // + // Example: + // targetsField: "self.results" + // + // If not set, the response itself must be a JSON array with the targets. + // // +kubebuilder:validation:Optional TargetsField string `json:"targetsField,omitempty"` - // CEL expression to extract the target name from the response - // If TargetsField is specified, this should be relative to TargetsField + // CEL expression for the target name. + // + // If not set, defaults to: + // item["name"] + // + // Example: + // "item.hostname" + // // +kubebuilder:validation:Optional - Name string `json:"name"` + Name string `json:"name,omitempty"` - // CEL expression to extract the target Address from the response - // If TargetsField is specified, this should be relative to TargetsField + // CEL expression for the target address. + // + // If not set, defaults to: + // item["address"] + // + // Example: + // "item.ip" + // // +kubebuilder:validation:Optional - Address string `json:"address"` + Address string `json:"address,omitempty"` - // CEL expression to extract the target port from the response - // If TargetsField is specified, this should be relative to TargetsField + // CEL expression for the target port. + // + // If not set, defaults to: + // item["port"] + // + // Example: + // "item.port" + // // +kubebuilder:validation:Optional Port string `json:"port,omitempty"` - // CEL expression to extract the target labels from the response + // CEL expression that returns a map of labels. + // The expression must evaluate to an object (map). + // + // Example: + // + // labels: | + // { + // "env": item.environment, + // "region": self.meta.region, + // item.dynamicKey: "value" + // } + // + // If not set, defaults to: + // item["labels"] + // + // The resulting map will be converted into labels. // The extracted labels will be merged with the static TargetLabels defined in the TargetSourceSpec, // with values from the response taking precedence in case of conflicts. + // // +kubebuilder:validation:Optional - Labels map[string]string `json:"labels,omitempty"` + Labels string `json:"labels,omitempty"` - // CEL expression to extract the target profile from the response - // If TargetsField is specified, this should be relative to TargetsField + // CEL expression for the target profile. + // + // If not set, defaults to: + // item["targetProfile"] + // + // Example: + // "item.type == 'edge' ? 'edge-profile' : 'default'" + // // +kubebuilder:validation:Optional TargetProfile string `json:"targetProfile,omitempty"` } @@ -216,10 +397,10 @@ type PushSignatureAuthSpec struct { // TargetSourceStatus defines the observed state of TargetSource type TargetSourceStatus struct { - Status string `json:"status,omitempty"` - ObservedGeneration int64 `json:"observedGeneration"` - TargetsCount int32 `json:"targetsCount,omitempty"` - LastSync metav1.Time `json:"lastSync,omitempty"` + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + TargetsCount int32 `json:"targetsCount,omitempty"` + LastSync metav1.Time `json:"lastSync,omitempty"` + Conditions []metav1.Condition `json:"conditions,omitempty"` } //+kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 201a35da..8c6d682c 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -323,6 +323,13 @@ func (in *GRPCTunnelConfig) DeepCopy() *GRPCTunnelConfig { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HTTPConfig) DeepCopyInto(out *HTTPConfig) { *out = *in + if in.Headers != nil { + in, out := &in.Headers, &out.Headers + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } if in.Authentication != nil { in, out := &in.Authentication, &out.Authentication *out = new(AuthenticationSpec) @@ -351,7 +358,7 @@ func (in *HTTPConfig) DeepCopyInto(out *HTTPConfig) { if in.ResponseMapping != nil { in, out := &in.ResponseMapping, &out.ResponseMapping *out = new(ResponseMappingSpec) - (*in).DeepCopyInto(*out) + **out = **in } if in.Push != nil { in, out := &in.Push, &out.Push @@ -1026,13 +1033,6 @@ func (in *PushSpec) DeepCopy() *PushSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ResponseMappingSpec) DeepCopyInto(out *ResponseMappingSpec) { *out = *in - if in.Labels != nil { - in, out := &in.Labels, &out.Labels - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResponseMappingSpec. @@ -1517,6 +1517,13 @@ func (in *TargetSourceSpec) DeepCopy() *TargetSourceSpec { func (in *TargetSourceStatus) DeepCopyInto(out *TargetSourceStatus) { *out = *in in.LastSync.DeepCopyInto(&out.LastSync) + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]metav1.Condition, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TargetSourceStatus. diff --git a/config/crd/bases/operator.gnmic.dev_targetsources.yaml b/config/crd/bases/operator.gnmic.dev_targetsources.yaml index 4ecef754..5cd849fa 100644 --- a/config/crd/bases/operator.gnmic.dev_targetsources.yaml +++ b/config/crd/bases/operator.gnmic.dev_targetsources.yaml @@ -127,8 +127,41 @@ spec: be set rule: '[has(self.basic),has(self.token)].filter(x,x==true).size() == 1' + body: + description: |- + Optional raw request body. + + Typically used with POST requests and contains JSON payload. + + Example: + body: | + { + "limit": 100, + "status": "active" + } + + Notes: + - Ignored for GET requests + - User must set appropriate Content-Type header if needed + type: string + headers: + additionalProperties: + type: string + description: |- + Optional HTTP headers to include in the request. + + These map directly to HTTP headers (key-value pairs). + + Example: + headers: + Content-Type: application/json + X-Custom-Header: value + + Precedence: + - Authentication configuration overrides any conflicting headers e.g. Authorization + type: object interval: - default: 6h + default: 30m description: Optional interval for polling the HTTP endpoint for targets type: string @@ -138,53 +171,121 @@ spec: properties: address: description: |- - CEL expression to extract the target Address from the response - If TargetsField is specified, this should be relative to TargetsField + CEL expression for the target address. + + If not set, defaults to: + item["address"] + + Example: + "item.ip" type: string labels: - additionalProperties: - type: string description: |- - CEL expression to extract the target labels from the response + CEL expression that returns a map of labels. + The expression must evaluate to an object (map). + + Example: + + labels: | + { + "env": item.environment, + "region": self.meta.region, + item.dynamicKey: "value" + } + + If not set, defaults to: + item["labels"] + + The resulting map will be converted into labels. The extracted labels will be merged with the static TargetLabels defined in the TargetSourceSpec, with values from the response taking precedence in case of conflicts. - type: object + type: string name: description: |- - CEL expression to extract the target name from the response - If TargetsField is specified, this should be relative to TargetsField + CEL expression for the target name. + + If not set, defaults to: + item["name"] + + Example: + "item.hostname" type: string port: description: |- - CEL expression to extract the target port from the response - If TargetsField is specified, this should be relative to TargetsField + CEL expression for the target port. + + If not set, defaults to: + item["port"] + + Example: + "item.port" type: string targetProfile: description: |- - CEL expression to extract the target profile from the response - If TargetsField is specified, this should be relative to TargetsField + CEL expression for the target profile. + + If not set, defaults to: + item["targetProfile"] + + Example: + "item.type == 'edge' ? 'edge-profile' : 'default'" type: string targetsField: description: |- - Field name in the JSON response that contains the list of items (targets). - If not specified, the entire response is expected to be a list of items. - All subsequent fields are specified relative to this field - Example: "results" if the response is of the form {"results": [ ... list of items ... ]} + CEL expression that selects the list of target objects from the response. + + This is evaluated once using: + self -> full JSON response + + Example: + targetsField: "self.results" + + If not set, the response itself must be a JSON array with the targets. type: string type: object + method: + default: GET + description: |- + HTTP method used for the request. + + Defaults to GET if not specified. + + Supported values: + - GET (default, no request body) + - POST (supports request body) + enum: + - GET + - POST + type: string pagination: description: Optional pagination configuration for parsing responses from the HTTP endpoint properties: nextField: description: |- - Field name in the JSON response that contains the next page reference. - The value can be either: - - a full URL (used directly for the next request), or - - a pagination token (appended as a query parameter using this field name as the key). + CEL expression used to extract the next page reference from the response. + + The expression is evaluated with: + self -> full JSON response + + It must evaluate to either: + - string (full URL OR token), or + - null (indicates end of pagination) - Must refer to a top-level key in the response object. - Example: "next" or "nextToken" + Examples: + "self.next" + "self.next_page_token" + "self['@odata.nextLink']" + type: string + requestParam: + description: |- + Query parameter name used when the extracted value is a token. + + Required for token-based pagination. + Ignored when NextField resolves to a full URL. + + Example: + requestParam: "page_token" type: string type: object push: @@ -276,7 +377,7 @@ spec: - enabled type: object timeout: - default: 10s + default: 30s description: Optional timeout for HTTP requests to the endpoint type: string tls: @@ -350,19 +451,71 @@ spec: status: description: TargetSourceStatus defines the observed state of TargetSource properties: + conditions: + items: + description: Condition contains details for one aspect of the current + state of this API Resource. + properties: + lastTransitionTime: + description: |- + lastTransitionTime is the last time the condition transitioned from one status to another. + This should be when the underlying condition changed. If that is not known, then using the time when the API field changed is acceptable. + format: date-time + type: string + message: + description: |- + message is a human readable message indicating details about the transition. + This may be an empty string. + maxLength: 32768 + type: string + observedGeneration: + description: |- + observedGeneration represents the .metadata.generation that the condition was set based upon. + For instance, if .metadata.generation is currently 12, but the .status.conditions[x].observedGeneration is 9, the condition is out of date + with respect to the current state of the instance. + format: int64 + minimum: 0 + type: integer + reason: + description: |- + reason contains a programmatic identifier indicating the reason for the condition's last transition. + Producers of specific condition types may define expected values and meanings for this field, + and whether the values are considered a guaranteed API. + The value should be a CamelCase string. + This field may not be empty. + maxLength: 1024 + minLength: 1 + pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$ + type: string + status: + description: status of the condition, one of True, False, Unknown. + enum: + - "True" + - "False" + - Unknown + type: string + type: + description: type of condition in CamelCase or in foo.example.com/CamelCase. + maxLength: 316 + pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$ + type: string + required: + - lastTransitionTime + - message + - reason + - status + - type + type: object + type: array lastSync: format: date-time type: string observedGeneration: format: int64 type: integer - status: - type: string targetsCount: format: int32 type: integer - required: - - observedGeneration type: object type: object served: true diff --git a/go.mod b/go.mod index 9dc2b789..c08b9b8d 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.5 require ( github.com/cert-manager/cert-manager v1.19.3 github.com/go-logr/logr v1.4.3 + github.com/google/cel-go v0.28.1 github.com/google/uuid v1.6.0 github.com/onsi/ginkgo/v2 v2.28.3 github.com/onsi/gomega v1.40.0 @@ -19,8 +20,10 @@ require ( ) require ( + cel.dev/expr v0.25.1 // indirect cloud.google.com/go/compute/metadata v0.9.0 // indirect github.com/Masterminds/semver/v3 v3.4.0 // indirect + github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -63,6 +66,7 @@ require ( go.uber.org/zap v1.27.1 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 // indirect golang.org/x/mod v0.35.0 // indirect golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect @@ -73,6 +77,7 @@ require ( golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.44.0 // indirect gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b // indirect google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.11 // indirect diff --git a/go.sum b/go.sum index 45485f13..0a845c4b 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,11 @@ +cel.dev/expr v0.25.1 h1:1KrZg61W6TWSxuNZ37Xy49ps13NUovb66QLprthtwi4= +cel.dev/expr v0.25.1/go.mod h1:hrXvqGP6G6gyx8UAHSHJ5RGk//1Oj5nXQ2NI02Nrsg4= cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdBtwLoEkH9Zs= cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= +github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ= +github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cert-manager/cert-manager v1.19.3 h1:3d0Nk/HO3BOmAdBJNaBh+6YgaO3Ciey3xCpOjiX5Obs= @@ -76,6 +80,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= +github.com/google/cel-go v0.28.1 h1:YWIwi77J4xIsYUwAF/iIuS6haffzIHS8yWI8glSbLWM= +github.com/google/cel-go v0.28.1/go.mod h1:X0bD6iVNR8pkROSOoHVdgTkzmRcosof7WQqCD6wcMc8= github.com/google/gnostic-models v0.7.1 h1:SisTfuFKJSKM5CPZkffwi6coztzzeYUhc3v4yxLWH8c= github.com/google/gnostic-models v0.7.1/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= @@ -172,6 +178,8 @@ go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 h1:R9PFI6EUdfVKgwKjZef7QIwGcBKu86OEFpJ9nUEP2l4= +golang.org/x/exp v0.0.0-20250718183923-645b1fa84792/go.mod h1:A+z0yzpGtvnG90cToK5n2tu8UJVP2XUATh+r+sfOOOc= golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM= golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU= golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA= @@ -194,6 +202,8 @@ gomodules.xyz/jsonpatch/v2 v2.5.0 h1:JELs8RLM12qJGXU4u/TO3V25KW8GreMKl9pdkk14RM0 gomodules.xyz/jsonpatch/v2 v2.5.0/go.mod h1:AH3dM2RI6uoBZxn3LVrfvJ3E0/9dG4cSrbuBJT4moAY= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls= +google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto= google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b h1:Mv8VFug0MP9e5vUxfBcE3vUkV6CImK3cMNMIDFjmzxU= google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE= diff --git a/internal/controller/discovery/client.go b/internal/controller/discovery/client.go index e5cc5ea0..30085aea 100644 --- a/internal/controller/discovery/client.go +++ b/internal/controller/discovery/client.go @@ -7,14 +7,18 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" ) -func fetchExistingTargets(ctx context.Context, c client.Client, ts *gnmicv1alpha1.TargetSource) ([]gnmicv1alpha1.Target, error) { +func fetchExistingTargets( + ctx context.Context, + c client.Client, + ts *gnmicv1alpha1.TargetSource, +) ([]gnmicv1alpha1.Target, error) { + var targetList gnmicv1alpha1.TargetList err := c.List( @@ -32,7 +36,7 @@ func fetchExistingTargets(ctx context.Context, c client.Client, ts *gnmicv1alpha return targetList.Items, nil } -func applyTarget(ctx context.Context, c client.Client, s *runtime.Scheme, desired *gnmicv1alpha1.Target, ts *gnmicv1alpha1.TargetSource) error { +func applyTarget(ctx context.Context, c client.Client, s *runtime.Scheme, desired *gnmicv1alpha1.Target, ts *gnmicv1alpha1.TargetSource) (controllerutil.OperationResult, error) { existing := &gnmicv1alpha1.Target{ ObjectMeta: metav1.ObjectMeta{ Name: desired.Name, @@ -40,14 +44,14 @@ func applyTarget(ctx context.Context, c client.Client, s *runtime.Scheme, desire }, } - _, err := controllerutil.CreateOrUpdate(ctx, c, existing, func() error { + result, err := controllerutil.CreateOrUpdate(ctx, c, existing, func() error { existing.Spec = desired.Spec existing.Labels = desired.Labels return controllerutil.SetControllerReference(ts, existing, s) }) - return err + return result, err } func deleteTarget(ctx context.Context, c client.Client, name string, namespace string) error { @@ -70,20 +74,3 @@ func deleteTarget(ctx context.Context, c client.Client, name string, namespace s return err } - -// updateTargetSourceStatus updates the status of the TargetSource Object ts. The only fields updated are targetCount and LastSync, which takes the current timestamp. -func updateTargetSourceStatus(ctx context.Context, c client.Client, ts *gnmicv1alpha1.TargetSource, targetCount int32) error { - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - latest := &gnmicv1alpha1.TargetSource{} - if err := c.Get(ctx, client.ObjectKeyFromObject(ts), latest); err != nil { - return err - } - - latest.Status.TargetsCount = targetCount - latest.Status.LastSync = metav1.Now() - - return c.Status().Update(ctx, latest) - }) - - return err -} diff --git a/internal/controller/discovery/core/ressource_fetcher_interface.go b/internal/controller/discovery/core/ressource_fetcher_interface.go new file mode 100644 index 00000000..31a82cf0 --- /dev/null +++ b/internal/controller/discovery/core/ressource_fetcher_interface.go @@ -0,0 +1,15 @@ +package core + +import ( + "context" + + corev1 "k8s.io/api/core/v1" +) + +// ResourceFetcher provides read-only access to namespaced Secret and +// ConfigMap data for loaders without requiring each loader to carry a +// Kubernetes client. +type ResourceFetcher interface { + GetSecretKey(ctx context.Context, namespace string, selector *corev1.SecretKeySelector) (string, error) + GetConfigMapKey(ctx context.Context, namespace string, selector *corev1.ConfigMapKeySelector) (string, error) +} diff --git a/internal/controller/discovery/core/status_updater_interface.go b/internal/controller/discovery/core/status_updater_interface.go new file mode 100644 index 00000000..35a7c3dc --- /dev/null +++ b/internal/controller/discovery/core/status_updater_interface.go @@ -0,0 +1,33 @@ +package core + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + ConditionTypeReady = "Ready" + ConditionTypeReconciling = "Reconciling" + ConditionTypeDegraded = "Degraded" + ConditionTypeStalled = "Stalled" + + ReasonWaitingForSync Reason = "WaitingForSync" + ReasonSyncStarted Reason = "SyncStarted" + ReasonSyncSucceeded Reason = "SyncSucceeded" + ReasonSyncCompleted Reason = "SyncCompleted" + ReasonSyncWithErrors Reason = "SyncSucceededWithErrors" + ReasonSyncFailed Reason = "SyncFailed" +) + +type Reason string + +type StatusUpdate struct { + Conditions []metav1.Condition + TargetsCount *int32 +} + +// StatusUpdater defines the interface for TargetLoaders and MessageProcessor to update the status of the TargetSource +type StatusUpdater interface { + UpdateStatus(context.Context, StatusUpdate) error +} diff --git a/internal/controller/discovery/core/types.go b/internal/controller/discovery/core/types.go index 8de38c1d..30726325 100644 --- a/internal/controller/discovery/core/types.go +++ b/internal/controller/discovery/core/types.go @@ -19,9 +19,11 @@ type DiscoveryRegistryValue struct { } type CommonLoaderConfig struct { - TargetsourceNN types.NamespacedName - ChunkSize int - AcceptPush bool + TargetsourceNN types.NamespacedName + ChunkSize int + AcceptPush bool + Updater StatusUpdater + ResourceFetcher ResourceFetcher } // EventAction represents the type of a discovery event diff --git a/internal/controller/discovery/discovery.go b/internal/controller/discovery/discovery.go index 491cdfb3..07c9ceda 100644 --- a/internal/controller/discovery/discovery.go +++ b/internal/controller/discovery/discovery.go @@ -10,6 +10,12 @@ package discovery // - core: message contracts, snapshot/event types, and transport helpers. // - message processor: snapshot + event target state application logic. // - loaders: target discovery providers (HTTP, webhook, etc.). -// - registry: key -> channel registry. +// - registry: generic discovery runtime registry. +// +// The package also contains discovery helpers: +// - client helpers for applying/deleting targets and updating TargetSource status. +// - a loader factory for constructing discovery loaders. +// - target normalization and event generation logic. +// - a resource fetcher for resolving Secret/ConfigMap values used by loaders. // // At the moment, the targetsource controller imports specific subpackages explicitly. diff --git a/internal/controller/discovery/loaders.go b/internal/controller/discovery/loaders.go index e8061d93..2644db3b 100644 --- a/internal/controller/discovery/loaders.go +++ b/internal/controller/discovery/loaders.go @@ -1,24 +1,26 @@ package discovery import ( + "context" "fmt" + "sigs.k8s.io/controller-runtime/pkg/client" + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery/core" http "github.com/gnmic/operator/internal/controller/discovery/loaders/http" ) // NewLoader creates a loader by name -func NewLoader(cfg *core.CommonLoaderConfig, spec gnmicv1alpha1.TargetSourceSpec) (core.Loader, error) { +func NewLoader(ctx context.Context, c client.Client, cfg *core.CommonLoaderConfig, spec gnmicv1alpha1.TargetSourceSpec) (core.Loader, error) { switch { case spec.Provider.HTTP != nil: - if spec.Provider.HTTP.Push != nil { - cfg.AcceptPush = spec.Provider.HTTP.Push.Enabled - } - return http.New(*cfg), nil + httpSpec := *spec.Provider.HTTP + cfg.AcceptPush = httpSpec.Push != nil && httpSpec.Push.Enabled + cfg.ResourceFetcher = newK8sResourceFetcher(c) + return http.New(*cfg, httpSpec), nil default: return nil, fmt.Errorf("unknown targetsource provider, check TargetSource CRD for %s", cfg.TargetsourceNN) } - } diff --git a/internal/controller/discovery/loaders/http/auth.go b/internal/controller/discovery/loaders/http/auth.go new file mode 100644 index 00000000..04f48f7e --- /dev/null +++ b/internal/controller/discovery/loaders/http/auth.go @@ -0,0 +1,78 @@ +package http + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + corev1 "k8s.io/api/core/v1" +) + +// fetchSecret uses the configured ResourceFetcher to resolve secret values. +func (l *Loader) fetchSecret(ctx context.Context, sel *corev1.SecretKeySelector) (string, error) { + if l.loaderCfg.ResourceFetcher == nil { + return "", nil + } + return l.loaderCfg.ResourceFetcher.GetSecretKey(ctx, l.loaderCfg.TargetsourceNN.Namespace, sel) +} + +func (l *Loader) applyAuthentication(req *http.Request) error { + auth := l.spec.Authentication + if auth == nil { + return nil + } + + if auth.Basic != nil { + return l.applyBasicAuth(req, auth.Basic.CredentialSecretRef) + } + + if auth.Token != nil { + return l.applyTokenAuth(req, auth.Token.Scheme, auth.Token.TokenSecretRef) + } + + return fmt.Errorf("no supported authentication method configured") +} + +// applyBasicAuth applies Basic authentication using the provided secret selector. +// Returns an error when credentials are missing or cannot be parsed. +func (l *Loader) applyBasicAuth(req *http.Request, sel *corev1.SecretKeySelector) error { + if sel == nil { + return fmt.Errorf("Basic auth enabled but no valid credentials provided") + } + + val, err := l.fetchSecret(req.Context(), sel) + if err != nil { + return err + } + + var cm map[string]string + if err := json.Unmarshal([]byte(val), &cm); err != nil { + return err + } + + username := cm["username"] + password := cm["password"] + if username == "" && password == "" { + return fmt.Errorf("Basic auth enabled but no valid credentials provided") + } + + req.SetBasicAuth(username, password) + return nil +} + +// applyTokenAuth applies token-based authentication using the provided secret selector +// Returns an error when no valid token is found +func (l *Loader) applyTokenAuth(req *http.Request, scheme string, sel *corev1.SecretKeySelector) error { + if sel == nil { + return fmt.Errorf("Token auth enabled but no valid token secret reference provided") + } + + token, err := l.fetchSecret(req.Context(), sel) + if err != nil { + return err + } + + req.Header.Set("Authorization", fmt.Sprintf("%s %s", scheme, token)) + return nil +} diff --git a/internal/controller/discovery/loaders/http/auth_test.go b/internal/controller/discovery/loaders/http/auth_test.go new file mode 100644 index 00000000..24fc821f --- /dev/null +++ b/internal/controller/discovery/loaders/http/auth_test.go @@ -0,0 +1,91 @@ +package http + +import ( + "encoding/json" + "net/http" + "strings" + "testing" + + corev1 "k8s.io/api/core/v1" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +func TestApplyAuthenticationCases(t *testing.T) { + credsJSON, _ := json.Marshal(map[string]string{"username": "user", "password": "pass"}) + + tests := []struct { + name string + config gnmicv1alpha1.HTTPConfig + fetcher core.ResourceFetcher + check func(t *testing.T, req *http.Request, err error) + }{ + { + name: "basic success", + config: gnmicv1alpha1.HTTPConfig{Authentication: &gnmicv1alpha1.AuthenticationSpec{Basic: &gnmicv1alpha1.BasicAuthSpec{CredentialSecretRef: &corev1.SecretKeySelector{}}}}, + fetcher: fakeResourceFetcher{secretValue: string(credsJSON)}, + check: func(t *testing.T, req *http.Request, err error) { + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + user, pass, ok := req.BasicAuth() + if !ok || user != "user" || pass != "pass" { + t.Fatalf("basic auth not set correctly") + } + }, + }, + { + name: "basic invalid json", + config: gnmicv1alpha1.HTTPConfig{Authentication: &gnmicv1alpha1.AuthenticationSpec{Basic: &gnmicv1alpha1.BasicAuthSpec{CredentialSecretRef: &corev1.SecretKeySelector{}}}}, + fetcher: fakeResourceFetcher{secretValue: "invalid-json"}, + check: func(t *testing.T, req *http.Request, err error) { + if err == nil { + t.Fatalf("expected error for invalid json") + } + }, + }, + { + name: "token success", + config: gnmicv1alpha1.HTTPConfig{Authentication: &gnmicv1alpha1.AuthenticationSpec{Token: &gnmicv1alpha1.TokenAuthSpec{Scheme: "Bearer", TokenSecretRef: &corev1.SecretKeySelector{}}}}, + fetcher: fakeResourceFetcher{secretValue: "token-value"}, + check: func(t *testing.T, req *http.Request, err error) { + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := req.Header.Get("Authorization"); !strings.Contains(got, "token-value") { + t.Fatalf("token header not set: %q", got) + } + }, + }, + { + name: "token missing secret", + config: gnmicv1alpha1.HTTPConfig{Authentication: &gnmicv1alpha1.AuthenticationSpec{Token: &gnmicv1alpha1.TokenAuthSpec{Scheme: "Bearer"}}}, + fetcher: nil, + check: func(t *testing.T, req *http.Request, err error) { + if err == nil { + t.Fatalf("expected token secret ref error") + } + }, + }, + { + name: "no method configured", + config: gnmicv1alpha1.HTTPConfig{Authentication: &gnmicv1alpha1.AuthenticationSpec{}}, + fetcher: nil, + check: func(t *testing.T, req *http.Request, err error) { + if err == nil { + t.Fatalf("expected unsupported auth error") + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + loader := makeLoader(tt.config, tt.fetcher) + req, _ := http.NewRequest(http.MethodGet, "http://example.com", nil) + err := loader.applyAuthentication(req) + tt.check(t, req, err) + }) + } +} diff --git a/internal/controller/discovery/loaders/http/helpers_test.go b/internal/controller/discovery/loaders/http/helpers_test.go new file mode 100644 index 00000000..240e3d40 --- /dev/null +++ b/internal/controller/discovery/loaders/http/helpers_test.go @@ -0,0 +1,112 @@ +package http + +import ( + "bytes" + "context" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + "net/http" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +// fakeResourceFetcher is a lightweight test double. +type fakeResourceFetcher struct { + secretValue string + configuration string + secretErr error + configMapErr error +} + +type fakeStatusUpdater struct { +} + +func (f fakeResourceFetcher) GetSecretKey(_ context.Context, _ string, _ *corev1.SecretKeySelector) (string, error) { + return f.secretValue, f.secretErr +} + +func (f fakeResourceFetcher) GetConfigMapKey(_ context.Context, _ string, _ *corev1.ConfigMapKeySelector) (string, error) { + return f.configuration, f.configMapErr +} + +func (f fakeStatusUpdater) UpdateStatus(ctx context.Context, update core.StatusUpdate) error { + return nil +} + +func makeLoader(spec gnmicv1alpha1.HTTPConfig, fetcher core.ResourceFetcher) *Loader { + if spec.Method == "" { + spec.Method = http.MethodGet + } + if spec.Interval == nil { + spec.Interval = &metav1.Duration{Duration: 6 * time.Hour} + } + return &Loader{ + loaderCfg: core.CommonLoaderConfig{ + TargetsourceNN: types.NamespacedName{Namespace: "default", Name: "test"}, + ChunkSize: 10, + Updater: newFakeStatusUpdater(), + ResourceFetcher: fetcher, + }, + spec: spec, + } +} + +func mustBuildClient(t *testing.T, loader *Loader) *http.Client { + t.Helper() + client, err := loader.buildHTTPClient(context.Background()) + if err != nil { + t.Fatalf("buildHTTPClient failed: %v", err) + } + return client +} + +func startLoaderRun(loader *Loader) (context.Context, context.CancelFunc, chan []core.DiscoveryMessage, chan error) { + ctx, cancel := context.WithCancel(context.Background()) + out := make(chan []core.DiscoveryMessage, 1) + done := make(chan error, 1) + go func() { done <- loader.Run(ctx, out) }() + return ctx, cancel, out, done +} + +// genSelfSignedCertPEM generates a self-signed certificate PEM used in tests. +func genSelfSignedCertPEM() (string, error) { + priv, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return "", err + } + tmpl := x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{ + CommonName: "test-ca", + }, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(24 * time.Hour), + KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageDigitalSignature, + BasicConstraintsValid: true, + IsCA: true, + } + der, err := x509.CreateCertificate(rand.Reader, &tmpl, &tmpl, &priv.PublicKey, priv) + if err != nil { + return "", err + } + var buf bytes.Buffer + if err := pem.Encode(&buf, &pem.Block{Type: "CERTIFICATE", Bytes: der}); err != nil { + return "", err + } + return buf.String(), nil +} + +func newFakeStatusUpdater() core.StatusUpdater { + return fakeStatusUpdater{} +} diff --git a/internal/controller/discovery/loaders/http/loader.go b/internal/controller/discovery/loaders/http/loader.go index 6b85a9bb..860a6f74 100644 --- a/internal/controller/discovery/loaders/http/loader.go +++ b/internal/controller/discovery/loaders/http/loader.go @@ -1,49 +1,149 @@ package http import ( + "bytes" "context" + "crypto/tls" + "crypto/x509" + "encoding/json" "fmt" + "net/http" "time" + "github.com/go-logr/logr" + "github.com/google/uuid" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/log" + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" "github.com/gnmic/operator/internal/controller/discovery/core" loaderUtils "github.com/gnmic/operator/internal/controller/discovery/loaders/utils" - "github.com/google/uuid" ) +// Loader implements the HTTP pull discovery mechanism +// It periodically polls an HTTP endpoint, extracts targets from the response, +// and emits discovery snapshots downstream type Loader struct { - commonCfg core.CommonLoaderConfig + loaderCfg core.CommonLoaderConfig + spec gnmicv1alpha1.HTTPConfig } -// New instantiates the http loader with the provided config -func New(cfg core.CommonLoaderConfig) core.Loader { - return &Loader{commonCfg: cfg} +// New creates a new HTTP loader instance with the provided configuration. +// The loader is stateless apart from its config and spec +func New(cfg core.CommonLoaderConfig, httpConfig gnmicv1alpha1.HTTPConfig) core.Loader { + return &Loader{loaderCfg: cfg, spec: httpConfig} } +// Name returns the loader's name, used for logging and metrics func (l *Loader) Name() string { return "http" } +// Run starts the HTTP discovery loop +// It performs an immediate fetch and then continues polling at a fixed interval func (l *Loader) Run(ctx context.Context, out chan<- []core.DiscoveryMessage) error { + if l.spec.URL == "" { + return nil + } + logger := log.FromContext(ctx).WithValues( "component", "loader", "name", l.Name(), - "targetsource", l.commonCfg.TargetsourceNN, + "targetsource", l.loaderCfg.TargetsourceNN, ) logger.Info( "HTTP loader started", - "targetsource", l.commonCfg.TargetsourceNN.Name, - "namespace", l.commonCfg.TargetsourceNN.Namespace, + "targetsource", l.loaderCfg.TargetsourceNN.Name, + "namespace", l.loaderCfg.TargetsourceNN.Namespace, ) - // Only for debugging: emit a static snapshot every 30 seconds - ticker := time.NewTicker(30 * time.Second) + logger.Info("HTTP loader started") + + client, err := l.buildHTTPClient(ctx) + if err != nil { + return fmt.Errorf("failed to build HTTP client: %w", err) + } + if l.spec.Interval == nil { + return fmt.Errorf("interval must be configured") + } + interval := l.spec.Interval.Duration + ticker := time.NewTicker(interval) defer ticker.Stop() - i := 1 + logger.Info( + "HTTP polling discovery started", + "interval", interval.String(), + "url", l.spec.URL, + ) + + // helper function to fetch targets and emit discovery messages + fetchAndEmit := func() { + // Set TargetSource conditions to "Reconciling" + l.loaderCfg.Updater.UpdateStatus( + ctx, + core.StatusUpdate{ + Conditions: []metav1.Condition{ + { + Type: core.ConditionTypeReconciling, + Status: metav1.ConditionStatus("True"), + Reason: string(core.ReasonSyncStarted), + Message: "Started fetching target source", + }, + }, + }, + ) + + // Fetch targets from HTTP endpoint + targets, err := l.fetchTargetsFromHTTPEndpoint(ctx, client, logger) + if err != nil { + logger.Error( + err, + "Failed to fetch targets from HTTP endpoint", + "url", l.spec.URL, + ) + // Set TargetSource conditions to "Stalled" if endpoint is not available + l.loaderCfg.Updater.UpdateStatus( + ctx, + core.StatusUpdate{ + Conditions: []metav1.Condition{ + { + Type: core.ConditionTypeStalled, + Status: metav1.ConditionStatus("True"), + Reason: string(core.ReasonSyncFailed), + Message: "HTTP endpoint not available", + }, + }, + }, + ) + return + } + + // Emit discovery snapshot downstream + snapshotID := fmt.Sprintf("%s-%s-%s", l.loaderCfg.TargetsourceNN.Namespace, l.loaderCfg.TargetsourceNN.Name, uuid.NewString()) + if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.loaderCfg.ChunkSize); err != nil { + logger.Error( + err, + "Failed to send discovery snapshot", + "snapshotID", snapshotID, + "targets", len(targets), + ) + return + } + + logger.Info( + "Discovery snapshot sent", + "snapshotID", snapshotID, + "targets", len(targets), + ) + } + + // Immediate fetch on startup + fetchAndEmit() + + // Periodic fetch for { select { case <-ctx.Done(): @@ -51,78 +151,242 @@ func (l *Loader) Run(ctx context.Context, out chan<- []core.DiscoveryMessage) er return nil case <-ticker.C: - // Switch case + i only needed to test behavior for messages with different values. - switch i { - case 1: - snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) - targets := []core.DiscoveredTarget{ - { - Name: "spine1", - Address: "clab-t1-spine1", - Port: 57400, - Labels: map[string]string{}, - }, - { - Name: "leaf1", - Address: "clab-leaf1", - Port: 57400, - Labels: map[string]string{}, - }, - } - - if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { - return err - } - case 2: - snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) - targets := []core.DiscoveredTarget{ - { - Name: "spine1", - Address: "clab-t1-spine1", - Port: 57400, - Labels: map[string]string{}, - }, - { - Name: "leaf2", - Address: "clab-t1-leaf2", - Port: 57400, - Labels: map[string]string{}, - }, - } + fetchAndEmit() + } + } +} - if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { - return err - } +// buildHTTPClient constructs an HTTP client with optional configuration +func (l *Loader) buildHTTPClient(ctx context.Context) (*http.Client, error) { + if l.spec.Timeout == nil { + return nil, fmt.Errorf("timeout must be configured") + } + timeout := l.spec.Timeout.Duration + transport := &http.Transport{} + // If TLS is configured, build TLS config (may include CA bundle). + if l.spec.TLS != nil { + tlsConfig, err := l.buildTLSConfig(ctx) + if err != nil { + return nil, err + } + transport.TLSClientConfig = tlsConfig + } - default: - snapshotID := fmt.Sprintf("%s-%s-%s", l.commonCfg.TargetsourceNN.Namespace, l.commonCfg.TargetsourceNN.Name, uuid.NewString()) - targets := []core.DiscoveredTarget{ - { - Name: "spine1", - Address: "clab-t1-spine1", - Port: 57400, - Labels: map[string]string{}, - }, - { - Name: "leaf1", - Address: "clab-t1-leaf1", - Port: 57400, - Labels: map[string]string{}, - }, - { - Name: "leaf2", - Address: "clab-t1-leaf2", - Port: 57400, - Labels: map[string]string{}, - }, - } + // Build the HTTP client with the specified timeout and TLS config + client := &http.Client{ + Timeout: timeout, + Transport: transport, + } + return client, nil +} - if err := loaderUtils.SendSnapshot(ctx, out, targets, snapshotID, l.commonCfg.ChunkSize); err != nil { - return err - } - } +// buildTLSConfig constructs a tls.Config according to the loader spec, +// fetching and parsing a CA bundle if requested. +func (l *Loader) buildTLSConfig(ctx context.Context) (*tls.Config, error) { + tlsConfig := &tls.Config{ + InsecureSkipVerify: l.spec.TLS.InsecureSkipVerify, + } - i++ + if l.spec.TLS.CABundleRef == nil { + return tlsConfig, nil + } + + if l.loaderCfg.ResourceFetcher == nil { + return nil, fmt.Errorf("resource fetcher is not configured") + } + + ref := l.spec.TLS.CABundleRef + if ref.Name == "" || ref.Key == "" { + return nil, fmt.Errorf("CABundleRef must specify both name and key") + } + + caPEM, err := l.loaderCfg.ResourceFetcher.GetConfigMapKey(ctx, l.loaderCfg.TargetsourceNN.Namespace, l.spec.TLS.CABundleRef) + if err != nil { + return nil, fmt.Errorf("failed to fetch CA bundle from config map ref: %w", err) + } + + certPool := x509.NewCertPool() + if ok := certPool.AppendCertsFromPEM([]byte(caPEM)); !ok { + return nil, fmt.Errorf("failed to parse CA bundle PEM") + } + tlsConfig.RootCAs = certPool + + return tlsConfig, nil +} + +// fetchTargetsFromHTTPEndpoint retrieves targets from the configured HTTP endpoint +func (l *Loader) fetchTargetsFromHTTPEndpoint( + ctx context.Context, + client *http.Client, + logger logr.Logger, +) ([]core.DiscoveredTarget, error) { + var allTargets []core.DiscoveredTarget + currentURL := l.spec.URL + + seen := make(map[string]struct{}) + + for { + if _, exists := seen[currentURL]; exists { + logger.Error(fmt.Errorf("pagination loop detected"), "stopping pagination", "url", currentURL) + break + } + seen[currentURL] = struct{}{} + + raw, headers, err := l.fetchPage(ctx, client, currentURL) + if err != nil { + return allTargets, err // do not silently drop pages + } + + // Extract targets + if targets, err := l.extractTargetsFromResponse(raw, logger); err != nil { + logger.Error(err, "Failed to extract targets", "url", currentURL) + } else { + allTargets = append(allTargets, targets...) } + + // Pagination: next page + nextURL, stop := l.getNextURL(raw, headers, currentURL, logger) + if stop { + break + } + currentURL = nextURL + } + + return allTargets, nil +} + +// fetchPage performs an HTTP GET request to the specified URL and decodes the JSON response +// and returns the raw response +func (l *Loader) fetchPage( + ctx context.Context, + client *http.Client, + url string, +) (any, http.Header, error) { + + method := l.spec.Method + if method == "" { + return nil, nil, fmt.Errorf("method must be configured") + } + + // Build request body (only for POST) + var bodyReader *bytes.Reader + if method == http.MethodPost && l.spec.Body != "" { + bodyReader = bytes.NewReader([]byte(l.spec.Body)) + } else { + bodyReader = bytes.NewReader(nil) + } + + // Build HTTP request + req, err := http.NewRequestWithContext(ctx, method, url, bodyReader) + if err != nil { + return nil, nil, fmt.Errorf("creating HTTP request failed: %w", err) + } + + req.Header.Set("Accept", "application/json") + // Apply user-defined headers + for key, val := range l.spec.Headers { + req.Header.Set(key, val) + } + + if err := l.applyAuthentication(req); err != nil { + return nil, nil, err + } + + // Execute HTTP request + resp, err := client.Do(req) + if err != nil { + return nil, nil, err } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, resp.Header, fmt.Errorf("unexpected HTTP status: %d", resp.StatusCode) + } + + // Decode HTTP response + var raw any + if err := json.NewDecoder(resp.Body).Decode(&raw); err != nil { + return nil, resp.Header, err + } + + return raw, resp.Header, nil +} + +// extractTargetsFromResponse extracts items from the response and maps each item into a DiscoveredTarget +func (l *Loader) extractTargetsFromResponse(raw any, logger logr.Logger) ([]core.DiscoveredTarget, error) { + var items []any + // If ResponseMapping is configured and TargetsField is provided we treat + // it as a CEL expression that evaluates against the whole response and + // must return an array of items. + if l.spec.ResponseMapping != nil && l.spec.ResponseMapping.TargetsField != "" { + prog, err := compileCEL(l.spec.ResponseMapping.TargetsField) + if err != nil { + return nil, fmt.Errorf("invalid TargetsField CEL expression: %w", err) + } + out, _, err := prog.Eval(map[string]any{"self": raw}) + if err != nil { + return nil, fmt.Errorf("evaluating TargetsField CEL expression failed: %w", err) + } + if out == nil { + return nil, fmt.Errorf("TargetsField expression returned nil") + } + array, ok := out.Value().([]any) + if !ok { + return nil, fmt.Errorf("invalid HTTP response: targetsField expression must evaluate to an array of objects") + } + items = array + } else { + //If TargetsField is empty, the raw response is expected to be an array of items. + array, ok := raw.([]any) + if !ok { + return nil, fmt.Errorf("invalid HTTP response: expected a JSON array when itemsField is not set") + } + items = array + } + + // Map items to targets + var targets []core.DiscoveredTarget + targets, err := l.mapItemsToTargets(items, raw, logger) + if err != nil { + return nil, fmt.Errorf("mapping items to targets failed: %w", err) + } + + return targets, nil +} + +// getNextURL determines the next page URL +// Returns: +// - nextURL: next request +// - stop: whether to terminate loop +func (l *Loader) getNextURL( + raw any, + headers http.Header, + currentURL string, + logger logr.Logger, +) (string, bool) { + // Extract pagination info + // Link header + if next := extractNextFromLinkHeader(headers); next != "" { + return next, false + } + + // Body + nextPage, err := l.extractNextPageInfo(raw) + if err != nil { + logger.Error(err, "pagination extraction failed") + return "", true + } + + if nextPage == "" { + return "", true + } + + // Build next page URL + nextURL, err := l.buildNextURL(currentURL, nextPage) + if err != nil { + logger.Error(err, "failed to build next URL") + return "", true + } + + return nextURL, false } diff --git a/internal/controller/discovery/loaders/http/loader_test.go b/internal/controller/discovery/loaders/http/loader_test.go index d02cfda6..2a42e1b2 100644 --- a/internal/controller/discovery/loaders/http/loader_test.go +++ b/internal/controller/discovery/loaders/http/loader_test.go @@ -1 +1,204 @@ package http + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +func TestBuildHTTPClientCases(t *testing.T) { + caPEM, err := genSelfSignedCertPEM() + if err != nil { + t.Fatalf("failed to generate CA PEM: %v", err) + } + + tests := []struct { + name string + spec gnmicv1alpha1.HTTPConfig + fetcher core.ResourceFetcher + expectsErr bool + }{ + { + name: "valid_CABundle", + spec: gnmicv1alpha1.HTTPConfig{ + TLS: &gnmicv1alpha1.ClientTLSConfig{ + CABundleRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "test-ca"}, + Key: "ca.crt", + }, + }, + Timeout: &metav1.Duration{Duration: 10 * time.Second}, + }, + fetcher: fakeResourceFetcher{configuration: caPEM}, + expectsErr: false, + }, + { + name: "invalid_CABundle_PEM", + spec: gnmicv1alpha1.HTTPConfig{ + TLS: &gnmicv1alpha1.ClientTLSConfig{CABundleRef: &corev1.ConfigMapKeySelector{}}, + Timeout: &metav1.Duration{Duration: 10 * time.Second}, + }, + fetcher: fakeResourceFetcher{configuration: "not-pem"}, + expectsErr: true, + }, + { + name: "CABundle_without_fetcher", + spec: gnmicv1alpha1.HTTPConfig{TLS: &gnmicv1alpha1.ClientTLSConfig{CABundleRef: &corev1.ConfigMapKeySelector{}}, Timeout: &metav1.Duration{Duration: 10 * time.Second}}, + fetcher: nil, + expectsErr: true, + }, + { + name: "timeout_missing", + spec: gnmicv1alpha1.HTTPConfig{}, + fetcher: nil, + expectsErr: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + loader := makeLoader(tc.spec, tc.fetcher) + client, err := loader.buildHTTPClient(context.Background()) + if tc.expectsErr { + if err == nil { + t.Fatalf("%s: expected error, got nil", tc.name) + } + return + } + if err != nil { + t.Fatalf("%s: unexpected error: %v", tc.name, err) + } + if client == nil { + t.Fatalf("%s: expected client, got nil", tc.name) + } + }) + } +} + +func TestFetchPageErrorsAndJSON(t *testing.T) { + loader := &Loader{ + loaderCfg: core.CommonLoaderConfig{TargetsourceNN: types.NamespacedName{Namespace: "default", Name: "test"}}, + spec: gnmicv1alpha1.HTTPConfig{Timeout: &metav1.Duration{Duration: 10 * time.Second}}, + } + + // method missing + if _, _, err := loader.fetchPage(context.Background(), nil, "http://example.com"); err == nil { + t.Fatalf("expected method configuration error") + } + + // non-200 and invalid JSON + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("boom")) + })) + defer server.Close() + + loader = makeLoader(gnmicv1alpha1.HTTPConfig{ + Method: http.MethodGet, + Timeout: &metav1.Duration{Duration: 10 * time.Second}, + }, nil) + + client := mustBuildClient(t, loader) + + // non-200 + if _, _, err := loader.fetchPage(context.Background(), client, server.URL); err == nil { + t.Fatalf("expected status code error") + } + + // invalid JSON + server.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("not-json")) + }) + + if _, _, err := loader.fetchPage(context.Background(), client, server.URL); err == nil { + t.Fatalf("expected JSON decode error") + } +} + +func TestFetchPagePOSTAndHeaders(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // validate method and headers/body + if r.Method != http.MethodPost { + t.Fatalf("expected POST, got %s", r.Method) + } + if r.Header.Get("X-Custom") != "value" { + t.Fatalf("missing header") + } + + var body map[string]any + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + t.Fatalf("body decode failed: %v", err) + } + + json.NewEncoder(w).Encode(map[string]any{"name": "target1"}) + })) + defer server.Close() + + spec := gnmicv1alpha1.HTTPConfig{ + URL: server.URL, + Method: http.MethodPost, + Headers: map[string]string{"X-Custom": "value"}, + Body: `{"query":"status"}`, + Timeout: &metav1.Duration{Duration: 10 * time.Second}, + } + + loader := makeLoader(spec, nil) + client := mustBuildClient(t, loader) + + raw, headers, err := loader.fetchPage(context.Background(), client, server.URL) + if err != nil { + t.Fatalf("fetchPage failed: %v", err) + } + + if headers == nil { + t.Fatalf("expected headers, got nil") + } + + resp, ok := raw.(map[string]any) + if !ok || resp["name"] != "target1" { + t.Fatalf("unexpected response: %#v", raw) + } +} + +func TestRunEmitsSnapshotOnImmediateFetch(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + json.NewEncoder(w).Encode([]any{map[string]any{"name": "t1", "address": "1.1.1.1", "port": float64(830)}}) + })) + defer server.Close() + + spec := gnmicv1alpha1.HTTPConfig{URL: server.URL, Method: http.MethodGet, Timeout: &metav1.Duration{Duration: 10 * time.Second}, Interval: &metav1.Duration{Duration: time.Hour}} + loader := makeLoader(spec, nil) + + _, cancel, out, done := startLoaderRun(loader) + defer cancel() + + select { + case msgs := <-out: + if len(msgs) == 0 { + t.Fatalf("expected discovery messages") + } + cancel() + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for Run to emit snapshot") + } + + select { + case err := <-done: + if err != nil { + t.Fatalf("Run returned error: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for Run to return") + } +} diff --git a/internal/controller/discovery/loaders/http/mapper.go b/internal/controller/discovery/loaders/http/mapper.go new file mode 100644 index 00000000..4bd34585 --- /dev/null +++ b/internal/controller/discovery/loaders/http/mapper.go @@ -0,0 +1,329 @@ +package http + +import ( + "fmt" + "math" + "reflect" + "strconv" + + "github.com/gnmic/operator/internal/controller/discovery/core" + "github.com/go-logr/logr" + "github.com/google/cel-go/cel" + "github.com/google/cel-go/common/types/ref" + "github.com/google/cel-go/ext" +) + +// mapItemsToTargets converts a list of raw JSON items into DiscoveredTargets using the configured mapping rules +func (l *Loader) mapItemsToTargets(items []any, full any, logger logr.Logger) ([]core.DiscoveredTarget, error) { + // Compile CEL expressions once for efficiency + compiled, err := l.compileMapping() + if err != nil { + return nil, fmt.Errorf("compile mapping: %w", err) + } + + // Map items to targets + targets := make([]core.DiscoveredTarget, 0, len(items)) + for _, item := range items { + obj, ok := item.(map[string]any) + if !ok { + logger.Error(fmt.Errorf("invalid target format"), + "failed to convert target to map", + "item", item, + ) + continue + } + target, err := l.mapItemToTarget(obj, full, compiled) + if err != nil { + logger.Error(err, + "failed to map target", + "item", obj, + ) + continue + } + + targets = append(targets, target) + } + + return targets, nil +} + +type compiledMapping struct { + name cel.Program + address cel.Program + port cel.Program + + targetProfile cel.Program + labels cel.Program +} + +func (l *Loader) compileMapping() (*compiledMapping, error) { + rm := l.spec.ResponseMapping + cm := &compiledMapping{} + if rm == nil { + return cm, nil + } + + var err error + if rm.Name != "" { + cm.name, err = compileCEL(rm.Name) + if err != nil { + return nil, fmt.Errorf("name: %w", err) + } + } + if rm.Address != "" { + cm.address, err = compileCEL(rm.Address) + if err != nil { + return nil, fmt.Errorf("address: %w", err) + } + } + if rm.Port != "" { + cm.port, err = compileCEL(rm.Port) + if err != nil { + return nil, fmt.Errorf("port: %w", err) + } + } + if rm.TargetProfile != "" { + cm.targetProfile, err = compileCEL(rm.TargetProfile) + if err != nil { + return nil, fmt.Errorf("targetProfile: %w", err) + } + } + if rm.Labels != "" { + cm.labels, err = compileCEL(rm.Labels) + if err != nil { + return nil, fmt.Errorf("labels: %w", err) + } + } + + return cm, nil +} + +// mapItemToTarget converts a raw JSON object into a DiscoveredTarget +func (l *Loader) mapItemToTarget(item map[string]any, full any, cm *compiledMapping) (core.DiscoveredTarget, error) { + name, err := l.getName(item, full, cm) + if err != nil { + return core.DiscoveredTarget{}, err + } + + address, err := l.getAddress(item, full, cm) + if err != nil { + return core.DiscoveredTarget{}, err + } + + return core.DiscoveredTarget{ + Name: name, + Address: address, + Port: l.getPort(item, full, cm), + Labels: l.getLabels(item, full, cm), + TargetProfile: l.getTargetProfile(item, full, cm), + }, nil +} + +// getName extracts the target name from the item using the compiled CEL expression if provided, +// otherwise it falls back to the default "name" field +func (l *Loader) getName(item map[string]any, full any, cm *compiledMapping) (string, error) { + if cm.name != nil { + val, err := evalCEL(cm.name, item, full) + if err != nil { + return "", err + } + + str, ok := val.(string) + if !ok || str == "" { + return "", fmt.Errorf("name must be non-empty string") + } + return str, nil + } + + val, ok := item["name"].(string) + if !ok || val == "" { + return "", fmt.Errorf("name must be non-empty string") + } + return val, nil +} + +// getAddress extracts the target address from the item using the compiled CEL expression if provided, +// otherwise it falls back to the default "address" field +func (l *Loader) getAddress(item map[string]any, full any, cm *compiledMapping) (string, error) { + if cm.address != nil { + val, err := evalCEL(cm.address, item, full) + if err != nil { + return "", err + } + + str, ok := val.(string) + if !ok || str == "" { + return "", fmt.Errorf("address must be non-empty string") + } + return str, nil + } + + val, ok := item["address"].(string) + if !ok || val == "" { + return "", fmt.Errorf("address must be non-empty string") + } + return val, nil +} + +// getPort extracts the target port from the item using the compiled CEL expression if provided, +// otherwise it falls back to the default "port" field +func (l *Loader) getPort(item map[string]any, full any, cm *compiledMapping) int32 { + if cm.port != nil { + val, err := evalCEL(cm.port, item, full) + if err == nil { + return extractPort(val) + } + return 0 + } + + return extractPort(item["port"]) +} + +// getLabels extracts the target labels from the item using the compiled CEL expressions if provided, +// otherwise it falls back to the default "labels" field +func (l *Loader) getLabels(item map[string]any, full any, cm *compiledMapping) map[string]string { + result := make(map[string]string) + + if cm != nil && cm.labels != nil { + val, err := evalCEL(cm.labels, item, full) + if err != nil { + return result + } + m, ok := val.(map[string]any) + if !ok { + return result + } + for k, v := range m { + result[k] = fmt.Sprintf("%v", v) + } + } + + // fallback: direct + if raw, ok := item["labels"].(map[string]any); ok { + for key, val := range raw { + result[key] = fmt.Sprintf("%v", val) + } + } + return result +} + +// getTargetProfile extracts the target profile from the item using the compiled CEL expression if provided, +// otherwise it falls back to the default "targetProfile" field +func (l *Loader) getTargetProfile(item map[string]any, full any, cm *compiledMapping) string { + if cm.targetProfile != nil { + val, err := evalCEL(cm.targetProfile, item, full) + if err == nil { + if str, ok := val.(string); ok { + return str + } + } + return "" + } + + if val, ok := item["targetProfile"].(string); ok { + return val + } + return "" +} + +var celEnv = mustNewEnv() + +// mustNewEnv creates a CEL environment with the necessary variable declarations for evaluating expressions +func mustNewEnv() *cel.Env { + env, err := cel.NewEnv( + cel.Variable("self", cel.DynType), + cel.Variable("item", cel.DynType), + // Required for ext.Regex + cel.OptionalTypes(), + // Include standard CEL declarations for common operations and types + ext.Strings(), + ext.Math(), + ext.Lists(), + ext.Sets(), + ext.Regex(), + ext.Bindings(), + ) + if err != nil { + panic(err) + } + return env +} + +// compileCEL compiles a CEL expression into a program that can be evaluated against items +func compileCEL(expr string) (cel.Program, error) { + ast, issues := celEnv.Compile(expr) + if issues != nil && issues.Err() != nil { + return nil, issues.Err() + } + return celEnv.Program(ast, cel.EvalOptions(cel.OptOptimize)) +} + +// evalCEL evaluates a compiled CEL program against an item +func evalCEL(p cel.Program, item map[string]any, full any) (any, error) { + out, _, err := p.Eval(map[string]any{ + "self": full, + "item": item, + }) + if err != nil { + return nil, err + } + if out == nil { + return nil, fmt.Errorf("CEL returned nil") + } + + return normalizeCEL(out.Value()), nil +} + +// normalizeCEL recursively converts CEL evaluation results into standard Go types +func normalizeCEL(v any) any { + switch raw := v.(type) { + case ref.Val: + v := raw.Value() + if v == nil { + return nil + } + return normalizeCEL(v) + + case []any: + for i := range raw { + raw[i] = normalizeCEL(raw[i]) + } + return raw + } + + // For maps, keys are converted to strings + rv := reflect.ValueOf(v) + if rv.Kind() == reflect.Map { + out := make(map[string]any) + for _, key := range rv.MapKeys() { + k := fmt.Sprintf("%v", normalizeCEL(key.Interface())) + val := normalizeCEL(rv.MapIndex(key).Interface()) + out[k] = val + } + return out + } + + return v +} + +// extractPort converts a CEL evaluation result into an int32 port number, +// handling both numeric and string representations +func extractPort(val any) int32 { + switch v := val.(type) { + case float64: + if v < 0 || v > math.MaxInt32 { + return 0 + } + return int32(v) + + case string: + p, err := strconv.ParseInt(v, 10, 32) + if err != nil { + return 0 + } + return int32(p) + + default: + return 0 + } +} diff --git a/internal/controller/discovery/loaders/http/mapping_test.go b/internal/controller/discovery/loaders/http/mapping_test.go new file mode 100644 index 00000000..2ba1623f --- /dev/null +++ b/internal/controller/discovery/loaders/http/mapping_test.go @@ -0,0 +1,156 @@ +package http + +import ( + "testing" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" + "github.com/go-logr/logr" +) + +func TestExtractTargetsAndMapping(t *testing.T) { + tests := []struct { + name string + config gnmicv1alpha1.HTTPConfig + raw any + validate func(t *testing.T, targets []core.DiscoveredTarget) + }{ + { + name: "direct mapping all fields", + config: gnmicv1alpha1.HTTPConfig{}, + raw: []any{map[string]any{"name": "t1", "address": "1.1.1.1", "port": "9000", "labels": map[string]any{"env": "prod", "region": "us-east"}, "targetProfile": "edge-profile"}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("direct mapping: expected 1 target, got %d", len(targets)) + } + tgt := targets[0] + if tgt.Name != "t1" { + t.Fatalf("direct mapping Name failed: got %q", tgt.Name) + } + if tgt.Address != "1.1.1.1" { + t.Fatalf("direct mapping Address failed: got %q", tgt.Address) + } + if tgt.Port != 9000 { + t.Fatalf("direct mapping Port failed: got %d", tgt.Port) + } + if tgt.Labels["env"] != "prod" || tgt.Labels["region"] != "us-east" { + t.Fatalf("direct mapping Labels failed: %#v", tgt.Labels) + } + if tgt.TargetProfile != "edge-profile" { + t.Fatalf("direct mapping TargetProfile failed: got %q", tgt.TargetProfile) + } + }, + }, + { + name: "CEL TargetsField extraction", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{TargetsField: "self.results"}}, + raw: map[string]any{"results": []any{map[string]any{"name": "t1", "address": "1.1.1.1", "port": float64(22)}}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("TargetsField extraction failed: got %d targets", len(targets)) + } + }, + }, + { + name: "CEL Name mapping", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{Name: "item.hostname"}}, + raw: []any{map[string]any{"hostname": "host-1", "address": "10.0.0.1", "port": float64(830)}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("Name mapping: expected 1 target, got %d", len(targets)) + } + if targets[0].Name != "host-1" { + t.Fatalf("Name mapping failed: got %q", targets[0].Name) + } + }, + }, + { + name: "CEL Address mapping", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{Address: "item.ip"}}, + raw: []any{map[string]any{"name": "t1", "ip": "192.168.1.1", "port": float64(830)}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("Address mapping: expected 1 target, got %d", len(targets)) + } + if targets[0].Address != "192.168.1.1" { + t.Fatalf("Address mapping failed: got %q", targets[0].Address) + } + }, + }, + { + name: "CEL Port mapping", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{Port: "item.mgmt_port"}}, + raw: []any{map[string]any{"name": "t1", "address": "10.0.0.1", "mgmt_port": float64(9000)}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("Port mapping: expected 1 target, got %d", len(targets)) + } + if targets[0].Port != 9000 { + t.Fatalf("Port mapping failed: got %d", targets[0].Port) + } + }, + }, + { + name: "CEL Labels mapping", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{Labels: `{"env": item.environment, "type": item.device_type}`}}, + raw: []any{map[string]any{"name": "t1", "address": "10.0.0.1", "port": float64(830), "environment": "prod", "device_type": "router"}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("Labels mapping: expected 1 target, got %d", len(targets)) + } + if targets[0].Labels["env"] != "prod" || targets[0].Labels["type"] != "router" { + t.Fatalf("Labels mapping failed: %#v", targets[0].Labels) + } + }, + }, + { + name: "CEL TargetProfile mapping", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{TargetProfile: `item.type == "edge" ? "edge-profile" : "default"`}}, + raw: []any{map[string]any{"name": "t1", "address": "10.0.0.1", "port": float64(830), "type": "edge"}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("TargetProfile mapping: expected 1 target, got %d", len(targets)) + } + if targets[0].TargetProfile != "edge-profile" { + t.Fatalf("TargetProfile mapping failed: got %q", targets[0].TargetProfile) + } + }, + }, + { + name: "CEL all mapping options combined", + config: gnmicv1alpha1.HTTPConfig{ResponseMapping: &gnmicv1alpha1.ResponseMappingSpec{TargetsField: "self.results", Name: "item.hostname", Address: "item.ip", Port: "item.port", Labels: `{"env": item.env}`, TargetProfile: `item.type == "edge" ? "edge-profile" : "default"`}}, + raw: map[string]any{"results": []any{map[string]any{"hostname": "host-1", "ip": "10.0.0.1", "port": float64(830), "env": "prod", "type": "edge"}}}, + validate: func(t *testing.T, targets []core.DiscoveredTarget) { + if len(targets) != 1 { + t.Fatalf("combined mapping: expected 1 target, got %d", len(targets)) + } + tgt := targets[0] + if tgt.Name != "host-1" || tgt.Address != "10.0.0.1" || tgt.Port != 830 || tgt.Labels["env"] != "prod" || tgt.TargetProfile != "edge-profile" { + t.Fatalf("combined mapping failed: %#v", tgt) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + loader := makeLoader(tt.config, nil) + targets, err := loader.extractTargetsFromResponse(tt.raw, logr.Discard()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + tt.validate(t, targets) + }) + } +} + +func TestMapItemsToTargetsSkipsInvalidItems(t *testing.T) { + loader := makeLoader(gnmicv1alpha1.HTTPConfig{}, nil) + tgts, err := loader.mapItemsToTargets([]any{"not-a-map", map[string]any{"name": "n", "address": "a"}}, nil, logr.Discard()) + if err != nil { + t.Fatalf("mapItemsToTargets failed: %v", err) + } + if len(tgts) != 1 || tgts[0].Name != "n" { + t.Fatalf("unexpected targets: %#v", tgts) + } +} diff --git a/internal/controller/discovery/loaders/http/pagination.go b/internal/controller/discovery/loaders/http/pagination.go new file mode 100644 index 00000000..fc4913e5 --- /dev/null +++ b/internal/controller/discovery/loaders/http/pagination.go @@ -0,0 +1,77 @@ +package http + +import ( + "fmt" + "net/http" + "net/url" + "strings" +) + +// extractNextPageInfo extracts pagination information from a response +func (l *Loader) extractNextPageInfo(raw any) (string, error) { + if l.spec.Pagination == nil || l.spec.Pagination.NextField == "" { + return "", nil + } + + // Extract next value + prog, err := compileCEL(l.spec.Pagination.NextField) + if err != nil { + return "", fmt.Errorf("invalid NextField CEL: %w", err) + } + out, _, err := prog.Eval(map[string]any{"self": raw}) + if err != nil { + return "", fmt.Errorf("CEL eval failed: %w", err) + } + if out == nil || out.Value() == nil { + return "", nil + } + + str, ok := out.Value().(string) + if !ok { + return "", fmt.Errorf("NextField must evaluate to string") + } + + return str, nil +} + +// Link header parsing +func extractNextFromLinkHeader(h http.Header) string { + link := h.Get("Link") + if link == "" { + return "" + } + + parts := strings.Split(link, ",") + for _, p := range parts { + if strings.Contains(p, `rel="next"`) { + start := strings.Index(p, "<") + end := strings.Index(p, ">") + if start != -1 && end != -1 { + return p[start+1 : end] + } + } + } + return "" +} + +// buildNextURL supports token and full URL +func (l *Loader) buildNextURL(currentURL, nextVal string) (string, error) { + if parsed, err := url.Parse(nextVal); err == nil && parsed.Scheme != "" { + return nextVal, nil // full URL + } + + if l.spec.Pagination.RequestParam == "" { + return "", fmt.Errorf("requestParam must be set for token pagination") + } + + parsedURL, err := url.Parse(currentURL) + if err != nil { + return "", err + } + + q := parsedURL.Query() + q.Set(l.spec.Pagination.RequestParam, nextVal) + parsedURL.RawQuery = q.Encode() + + return parsedURL.String(), nil +} diff --git a/internal/controller/discovery/loaders/http/pagination_test.go b/internal/controller/discovery/loaders/http/pagination_test.go new file mode 100644 index 00000000..707518d5 --- /dev/null +++ b/internal/controller/discovery/loaders/http/pagination_test.go @@ -0,0 +1,112 @@ +package http + +import ( + "net/http" + "strings" + "testing" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" +) + +func TestPaginationHelpersAndNextURL(t *testing.T) { + loader := makeLoader( + gnmicv1alpha1.HTTPConfig{ + Pagination: &gnmicv1alpha1.PaginationSpec{ + NextField: "self.next", + RequestParam: "next", + }, + }, + nil, + ) + + next, err := loader.extractNextPageInfo(map[string]any{"next": "token"}) + if err != nil || next != "token" { + t.Fatalf("extractNextPageInfo failed: %v", err) + } + + nextURL, err := loader.buildNextURL("https://example.com/path", "token") + if err != nil || !strings.Contains(nextURL, "next=token") { + t.Fatalf("buildNextURL failed: %v, %s", err, nextURL) + } + + nextURL, err = loader.buildNextURL("https://example.com/path", "https://example.com/other") + if err != nil || nextURL != "https://example.com/other" { + t.Fatalf("buildNextURL absolute failed: %v, %s", err, nextURL) + } +} + +func TestPagination_ArrayNoPagination(t *testing.T) { + raw := []any{ + map[string]any{"name": "a"}, + } + + loader := &Loader{ + spec: gnmicv1alpha1.HTTPConfig{}, + } + + next, err := loader.extractNextPageInfo(raw) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if next != "" { + t.Fatalf("expected empty next, got %s", next) + } +} + +func TestPagination_NextURL(t *testing.T) { + raw := map[string]any{ + "next": "http://example.com/page2", + } + + loader := &Loader{ + spec: gnmicv1alpha1.HTTPConfig{ + Pagination: &gnmicv1alpha1.PaginationSpec{ + NextField: "self.next", + }, + }, + } + + next, err := loader.extractNextPageInfo(raw) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if next != "http://example.com/page2" { + t.Fatalf("unexpected next: %s", next) + } +} + +func TestPagination_Token(t *testing.T) { + raw := map[string]any{ + "next_page_token": "abc", + } + + loader := &Loader{ + spec: gnmicv1alpha1.HTTPConfig{ + Pagination: &gnmicv1alpha1.PaginationSpec{ + NextField: "self.next_page_token", + RequestParam: "page_token", + }, + }, + } + + next, err := loader.extractNextPageInfo(raw) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if next != "abc" { + t.Fatalf("unexpected token: %s", next) + } +} + +func TestPagination_LinkHeader(t *testing.T) { + headers := http.Header{} + headers.Set("Link", `; rel="next"`) + + next := extractNextFromLinkHeader(headers) + + if next != "http://example.com/page2" { + t.Fatalf("unexpected next link: %s", next) + } +} diff --git a/internal/controller/discovery/message_processor.go b/internal/controller/discovery/message_processor.go index f573b1bc..a61e727a 100644 --- a/internal/controller/discovery/message_processor.go +++ b/internal/controller/discovery/message_processor.go @@ -4,8 +4,10 @@ import ( "context" "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" @@ -31,15 +33,17 @@ type MessageProcessor struct { // Events are deferred while snapshot is in progress deferredEvents []core.DiscoveryEvent targetCount int32 + updater core.StatusUpdater } // NewMessageProcessor wires a MessageProcessor instance -func NewMessageProcessor(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage) *MessageProcessor { +func NewMessageProcessor(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource, in <-chan []core.DiscoveryMessage, u core.StatusUpdater) *MessageProcessor { return &MessageProcessor{ client: c, scheme: s, targetSource: ts, in: in, + updater: u, } } @@ -229,16 +233,46 @@ func (m *MessageProcessor) processEvent(ctx context.Context, event core.Discover } // Apply events - err := m.applyEvent(ctx, event, logger) + result, err := m.applyEvent(ctx, event, logger) if err == nil { switch event.Event { case core.EventApply: - m.targetCount++ - m.updateStatus(ctx, logger) + if result == controllerutil.OperationResultCreated { + m.targetCount++ + } + m.updater.UpdateStatus( + ctx, + core.StatusUpdate{ + Conditions: []metav1.Condition{ + { + Type: core.ConditionTypeReady, + Status: metav1.ConditionStatus("True"), + Reason: string(core.ReasonSyncSucceeded), + Message: "Successfully synced all targets", + }, + }, + TargetsCount: &m.targetCount, + }, + ) case core.EventDelete: m.targetCount-- - m.updateStatus(ctx, logger) + m.updater.UpdateStatus( + ctx, + core.StatusUpdate{ + Conditions: []metav1.Condition{ + { + Type: core.ConditionTypeReady, + Status: metav1.ConditionStatus("True"), + Reason: string(core.ReasonSyncSucceeded), + Message: "Successfully synced all targets", + }, + }, + TargetsCount: &m.targetCount, + }, + ) } + } else { + // m.updateStatus(ctx, gnmicv1alpha1.SyncStatusError, err) } return err @@ -309,8 +343,32 @@ func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshot "numOfDelete", nDelete, ) + errCount := 0 for _, e := range events { - m.applyEvent(ctx, e, logger) + _, err = m.applyEvent(ctx, e, logger) + if err != nil { + errCount++ + } + } + if errCount != 0 { + // m.updateStatus(ctx, gnmicv1alpha1.SyncStatusSyncedWithErrors, err) + } else { + // Because of idempotency, allTargets = desired state = targets existing in Kubernetes. Overwrites the counter to "reset" it. + m.targetCount = int32(len(allTargets)) + m.updater.UpdateStatus( + ctx, + core.StatusUpdate{ + Conditions: []metav1.Condition{ + { + Type: core.ConditionTypeReady, + Status: metav1.ConditionStatus("True"), + Reason: string(core.ReasonSyncSucceeded), + Message: "Successfully synced all targets", + }, + }, + TargetsCount: &m.targetCount, + }, + ) } // Replay deferred events @@ -325,53 +383,42 @@ func (m *MessageProcessor) applySnapshot(ctx context.Context, snapshot *snapshot } } - // Because of idempotency, allTargets = desired state = targets existing in Kubernetes. Overwrites the counter to "reset" it. - m.targetCount = int32(len(allTargets)) - m.updateStatus(ctx, logger) - m.resetSnapshot() m.deferredEvents = nil return nil } -func (m *MessageProcessor) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) error { +// applyEvent applies a DiscoveryEvent to the Kubernetes cluster and returns controllerutil.OperationResult to identify create or update events. Returns controllerutil.OperationResultNone for delete or on errors +func (m *MessageProcessor) applyEvent(ctx context.Context, event core.DiscoveryEvent, logger logr.Logger) (controllerutil.OperationResult, error) { switch event.Event { case core.EventDelete: if err := deleteTarget(ctx, m.client, event.Target.Name, m.targetSource.Namespace); err != nil { logger.Error(err, "error deleting target", "targetName", event.Target.Name, ) - return err + return controllerutil.OperationResultNone, err } else { logger.Info("deleted target object", "name", event.Target.Name, ) + return controllerutil.OperationResultNone, err } case core.EventApply: target := generateTargetResource(event.Target, m.targetSource) - if err := applyTarget(ctx, m.client, m.scheme, target, m.targetSource); err != nil { + if result, err := applyTarget(ctx, m.client, m.scheme, target, m.targetSource); err != nil { logger.Error(err, "error applying target", "targetName", event.Target.Name, ) - return err + return controllerutil.OperationResultNone, err } else { logger.Info("applied target object", "name", event.Target.Name, ) + return result, nil } - } - - return nil -} - -func (m *MessageProcessor) updateStatus(ctx context.Context, logger logr.Logger) { - if err := updateTargetSourceStatus(ctx, m.client, m.targetSource, m.targetCount); err != nil { - logger.Error(err, "error updating TargetSource status") - } else { - logger.Info("updated target source status", - "targetCount", m.targetCount, - ) + default: + return controllerutil.OperationResultNone, fmt.Errorf("unknown event type %s", event.Event) } } diff --git a/internal/controller/discovery/ressource_fetcher_client.go b/internal/controller/discovery/ressource_fetcher_client.go new file mode 100644 index 00000000..c544b30a --- /dev/null +++ b/internal/controller/discovery/ressource_fetcher_client.go @@ -0,0 +1,60 @@ +package discovery + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +// k8sResourceFetcher implements core.ResourceFetcher using a controller runtime client +type k8sResourceFetcher struct { + client client.Client +} + +// GetSecretKey retrieves the value of a specific key from a Kubernetes Secret +func (f *k8sResourceFetcher) GetSecretKey(ctx context.Context, namespace string, selector *corev1.SecretKeySelector) (string, error) { + if selector == nil { + return "", nil + } + var secret corev1.Secret + key := client.ObjectKey{Namespace: namespace, Name: selector.Name} + if err := f.client.Get(ctx, key, &secret); err != nil { + return "", err + } + if selector.Key == "" { + return "", fmt.Errorf("secret key selector has empty key for secret %s/%s", namespace, selector.Name) + } + val, ok := secret.Data[selector.Key] + if !ok { + return "", fmt.Errorf("secret %s/%s does not contain key %s", namespace, selector.Name, selector.Key) + } + return string(val), nil +} + +// GetConfigMapKey retrieves the value of a specific key from a Kubernetes ConfigMap +func (f *k8sResourceFetcher) GetConfigMapKey(ctx context.Context, namespace string, selector *corev1.ConfigMapKeySelector) (string, error) { + if selector == nil { + return "", nil + } + var cm corev1.ConfigMap + key := client.ObjectKey{Namespace: namespace, Name: selector.Name} + if err := f.client.Get(ctx, key, &cm); err != nil { + return "", err + } + if selector.Key == "" { + return "", fmt.Errorf("config map key selector has empty key for config map %s/%s", namespace, selector.Name) + } + val, ok := cm.Data[selector.Key] + if !ok { + return "", fmt.Errorf("config map %s/%s does not contain key %s", namespace, selector.Name, selector.Key) + } + return val, nil +} + +func newK8sResourceFetcher(c client.Client) core.ResourceFetcher { + return &k8sResourceFetcher{client: c} +} diff --git a/internal/controller/discovery/status_updater_client.go b/internal/controller/discovery/status_updater_client.go new file mode 100644 index 00000000..16425361 --- /dev/null +++ b/internal/controller/discovery/status_updater_client.go @@ -0,0 +1,69 @@ +package discovery + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + + gnmicv1alpha1 "github.com/gnmic/operator/api/v1alpha1" + "github.com/gnmic/operator/internal/controller/discovery/core" +) + +// k8sStatusUpdater is a client which fulfills the StatusUpdater interface +type k8sStatusUpdater struct { + client client.Client + scheme *runtime.Scheme + targetSource *gnmicv1alpha1.TargetSource +} + +// Returns an instance of k8sStatusUpdater +func NewK8sStatusUpdater(c client.Client, s *runtime.Scheme, ts *gnmicv1alpha1.TargetSource) *k8sStatusUpdater { + return &k8sStatusUpdater{ + client: c, + scheme: s, + targetSource: ts, + } +} + +// UpdateStatus takes a StatusUpdate holding Conditions and a pointer referencing the TargetsCount. +// If TargetsCount is set, the LastSync time gets set to metav1.Now(). +// Replaces LastTransitionTime of each Condition with metav1.Now(). +func (c *k8sStatusUpdater) UpdateStatus(ctx context.Context, update core.StatusUpdate) error { + + return c.patchStatus(ctx, func( + ts *gnmicv1alpha1.TargetSource, + ) { + now := metav1.Now() + + // Update status fields: Replace all Conditions and set TargetsCount and LastSync if pointer != nil + for i := range update.Conditions { + update.Conditions[i].LastTransitionTime = now + } + ts.Status.Conditions = update.Conditions + + if update.TargetsCount != nil { + ts.Status.TargetsCount = *update.TargetsCount + ts.Status.LastSync = now + } + }) +} + +// patchStatus is an internal function to update the Kubernetes object +func (c *k8sStatusUpdater) patchStatus(ctx context.Context, mutate func(*gnmicv1alpha1.TargetSource)) error { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latest := &gnmicv1alpha1.TargetSource{} + if err := c.client.Get(ctx, client.ObjectKeyFromObject(c.targetSource), latest); err != nil { + return err + } + + patch := client.MergeFrom(latest.DeepCopy()) + mutate(latest) + + return c.client.Status().Patch(ctx, latest, patch) + }) + + return err +} diff --git a/internal/controller/targetsource_controller.go b/internal/controller/targetsource_controller.go index 7f30fc85..e2ffef81 100644 --- a/internal/controller/targetsource_controller.go +++ b/internal/controller/targetsource_controller.go @@ -20,8 +20,10 @@ import ( "context" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -104,8 +106,8 @@ func (r *TargetSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, err } - targetSource.Status.ObservedGeneration = targetSource.Generation - if err := r.Status().Update(ctx, targetSource); err != nil { + // Update TargetSource Status for new generation + if err := r.updateObservedGeneration(ctx, targetSource); err != nil { return ctrl.Result{}, err } @@ -179,9 +181,25 @@ func (r *TargetSourceReconciler) startDiscovery( ) error { targetChannel := make(chan []discoveryTypes.DiscoveryMessage, r.BufferSize) ctx, cancel := context.WithCancel(context.Background()) + + statusUpdater := discovery.NewK8sStatusUpdater(r.Client, r.Scheme, targetSource) + if err := statusUpdater.UpdateStatus(ctx, discoveryTypes.StatusUpdate{ + Conditions: []metav1.Condition{ + { + Type: discoveryTypes.ConditionTypeReady, + Status: metav1.ConditionFalse, + Reason: string(discoveryTypes.ReasonWaitingForSync), + Message: "Waiting for initial sync", + }, + }, + }); err != nil { + logger.Error(err, "updating targetsource status failed") + } + loaderConfig := discoveryTypes.CommonLoaderConfig{ TargetsourceNN: key, ChunkSize: r.ChunkSize, + Updater: statusUpdater, } // Cleanup function to cleanup discovery runtime of targetsource @@ -195,8 +213,9 @@ func (r *TargetSourceReconciler) startDiscovery( r.Scheme, targetSource, targetChannel, + statusUpdater, ) - loader, err := discovery.NewLoader(&loaderConfig, targetSource.Spec) + loader, err := discovery.NewLoader(ctx, r.Client, &loaderConfig, targetSource.Spec) if err != nil { logger.Error(err, "Target loader could not be created") cleanup() @@ -238,6 +257,22 @@ func (r *TargetSourceReconciler) startDiscovery( return nil } +func (r *TargetSourceReconciler) updateObservedGeneration(ctx context.Context, ts *gnmicv1alpha1.TargetSource) error { + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + latest := &gnmicv1alpha1.TargetSource{} + if err := r.Client.Get(ctx, client.ObjectKeyFromObject(ts), latest); err != nil { + return err + } + + patch := client.MergeFrom(latest.DeepCopy()) + latest.Status.ObservedGeneration = ts.Generation + + return r.Client.Status().Patch(ctx, latest, patch) + }) + + return err +} + // SetupWithManager sets up the controller with the Manager. func (r *TargetSourceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). diff --git a/test.mk b/test.mk index 23c59834..67f19239 100644 --- a/test.mk +++ b/test.mk @@ -153,5 +153,5 @@ apply-test-clusters: ## Apply the test clusters for testing kubectl apply -f test/integration/resources/clusters .PHONY: apply-test-resources -apply-test-resources: apply-test-targets apply-test-subscriptions apply-test-outputs apply-test-pipelines apply-test-clusters +apply-test-resources: apply-test-targets apply-test-targetsources apply-test-subscriptions apply-test-outputs apply-test-pipelines apply-test-clusters diff --git a/test/integration/resources/targetsources/http.yaml b/test/integration/resources/targetsources/http.yaml index 422cfdcd..6f0f694d 100644 --- a/test/integration/resources/targetsources/http.yaml +++ b/test/integration/resources/targetsources/http.yaml @@ -6,6 +6,7 @@ spec: provider: http: url: http://http-svc.default.svc/targets.json + interval: 60s targetLabels: integrationtest: http - targetProfile: default \ No newline at end of file + targetProfile: default