Skip to content

feat(filter): expose openfilter reply port via appProtocol on ServicePort#64

Open
nikitos wants to merge 5 commits into
PlainsightAI:mainfrom
nikitos:pr/service2
Open

feat(filter): expose openfilter reply port via appProtocol on ServicePort#64
nikitos wants to merge 5 commits into
PlainsightAI:mainfrom
nikitos:pr/service2

Conversation

@nikitos

@nikitos nikitos commented May 26, 2026

Copy link
Copy Markdown

Summary

Adds an appProtocol field to ServicePort so filters that speak the OpenFilter ZMQ stream protocol get their reply channel exposed automatically. When a service port sets appProtocol: openfilter, the controller exposes a second Service port at port + 1 (the reply channel paired with the data port). Plain TCP ports are unchanged.

What changed

  • api/v1alpha1/pipeline_types.go: new AppProtocol enum (tcp | openfilter) and an optional appProtocol field on ServicePort, defaulting to tcp. Adds a validation rule requiring port <= 65534 when appProtocol = openfilter so the derived port + 1 cannot overflow.
  • internal/controller/pipelineinstance_controller_streaming.go: in ensureFilterServices, when appProtocol = openfilter, append a second Service port named <filter>-reply at port + 1 / targetPort + 1. Default tcp behavior is unchanged.
  • config/crd/bases and deployment/.../crds: regenerated CRDs with the new field and validation rule.

Testing

Unit tests for ensureFilterServices: openfilter with an explicit targetPort gives two ports with correct ports, targetPorts, names, and protocols; default protocol gives a single port (regression guard); openfilter without an explicit targetPort falls back to port and targets port + 1.

Notes

The OpenFilter container owns the port + 1 listener; the controller only wires the Service.

@lucasmundim lucasmundim left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution and for digging into the port + 1 pattern — this maps to a real OpenFilter wire-protocol convention (every TCP filter binds the configured port for PUB/SUB frames AND port + 1 for the PUSH/PULL back-channel; see openfilter/filter_runtime/zeromq.py around lines 215-218 and 572-575). Exposing both ports externally so out-of-cluster subscribers can use the back-channel is a legitimate use case.

Before continuing to iterate on isFilter, I'd like to propose a slightly different shape that we think serves this use case better in the long run. Posting as a suggestion, not a blocker — happy to discuss.

Suggested direction: replace isFilter with appProtocol

The current isFilter boolean conflates two things: (1) "this port carries the OpenFilter ZMQ stream," and (2) "therefore pair port + 1." That makes the API protocol-specific and hard to extend — if another OpenFilter convention or another application protocol (RTSP, gRPC, etc.) ever needs special handling, we'd be adding more booleans.

A cleaner shape is an enum that names the application protocol carried over the port, and lets the controller derive the right exposure behavior from that:

type ServicePort struct {
    Name       string
    Port       int32
    TargetPort *int32
    Protocol   corev1.Protocol      // unchanged: TCP/UDP (transport layer)
    Type       corev1.ServiceType   // ClusterIP / LoadBalancer (cf. PR #65)

    // appProtocol declares the application protocol carried over this port.
    // The controller uses this to apply protocol-specific exposure logic.
    //   - "tcp"        (default): single port, no special handling
    //   - "http"       : HTTP/HTTPS — eligible for L7 routing by upstream actors
    //   - "openfilter" : OpenFilter ZMQ stream — also exposes `port + 1`
    //                    (PUSH/PULL reply channel paired with the PUB/SUB data port)
    // +optional
    // +kubebuilder:default=tcp
    // +kubebuilder:validation:Enum=tcp;http;openfilter
    AppProtocol AppProtocol `json:"appProtocol,omitempty"`
}

This composes cleanly with PR #65 (which adds type) — together a user writes:

services:
  - name: detector
    port: 5550
    appProtocol: openfilter   # auto-exposes 5550 + 5551
    type: LoadBalancer        # on standalone clusters with an LB provisioner

Why this is better

  1. Naming describes effect. A reader sees appProtocol: openfilter and immediately understands both the wire protocol and why two ports appear.
  2. Forward-compatible. Future protocols (rtsp, grpc, …) become additive enum values, not new booleans.
  3. Bridges to platform integrations. Downstream actors (e.g., the platform deployment-agent that today emits Gateway HTTPRoutes for HTTP services) can switch on appProtocol to choose the right exposure mechanism — HTTPRoute for http, raw Service for openfilter/tcp, and future Gateway TCPRoute when adopted.
  4. Mirrors a native Kubernetes field. appProtocol is a real concept on corev1.ServicePort — this feels idiomatic.
  5. v1alpha1 is the right time to lock in this shape; no migration debt.

If this is a bigger change than you want to take on

Completely understood — we can pick it up on our side. In that case, the minimal version of this PR would be: rename IsFilter to something purpose-named (e.g., withReplyPort), fold the OpenFilter ZMQ context into the godoc, fix the markers (see inline), add overflow validation, and add tests. We'd then layer appProtocol on top later. Whichever path you prefer, please let us know.

Inline comments below cover issues that apply regardless of the final field shape (markers, overflow, port-naming in the controller, tests).

Comment thread api/v1alpha1/pipeline_types.go Outdated
Comment thread internal/controller/pipelineinstance_controller_streaming.go Outdated
@nikitos

nikitos commented May 28, 2026

Copy link
Copy Markdown
Author

i think i can change it to "appProtocol" concept

@nikitos

nikitos commented May 28, 2026

Copy link
Copy Markdown
Author

But what about "http" type? Both HTTPRoute and Ingress need configured controllers and lots of additional configuration (path, annotations etc). What if i leave only openfilter and tcp for now?

@lucasmundim lucasmundim left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great — happy to take the appProtocol direction.

On http: your reasoning is right, and I'd agree with shipping openfilter and tcp only for now. The principle is that an enum value should correspond to behavior the controller actually performs — http would currently be indistinguishable from tcp at the controller level (the HTTPRoute creation lives downstream in the platform's deployment-agent, not here). Exposing it would promise behavior we don't deliver yet.

Adding http later is purely additive in v1alpha1, so we don't lose anything by deferring it. When the controller (or a downstream actor we want to coordinate with) grows L7-aware routing, we can introduce http and any related fields (paths, annotations) at that point.

Suggested enum for now:

// +kubebuilder:default=tcp
// +kubebuilder:validation:Enum=tcp;openfilter
AppProtocol AppProtocol `json:"appProtocol,omitempty"`

Looking forward to the next iteration — ping me when it's ready and I'll re-review.

@lucasmundim lucasmundim left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent rework — the appProtocol design is in place, the XValidation rule for port <= 65534 when appProtocol=openfilter is exactly the overflow guard we wanted, the second port is now purpose-named (filter-reply), and the two tests cover both the openfilter case (with explicit TargetPort, exercising both +1 paths) and the default no-reply case.

A handful of small things to clean up before merge — see inline. Summary:

  • Const docstrings should start with the constant's name (Go convention).
  • The enum marker is duplicated between the AppProtocol type and the field — the generated CRD ends up with the enum listed twice in an allOf.
  • Tests reassign the package-level ctx (declared in suite_test.go:43) instead of declaring a local one — should use :=.
  • Heads-up coordination with PR #65: both PRs add an identical makeReconcilerWithFakeClient helper to the same test file, so whichever PR merges second will fail to compile.

Comment thread api/v1alpha1/pipeline_types.go Outdated
Comment thread api/v1alpha1/pipeline_types.go Outdated
Comment thread internal/controller/pipelineinstance_controller_streaming_test.go Outdated
Comment thread internal/controller/pipelineinstance_controller_streaming_test.go

@lucasmundim lucasmundim left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough rework and quick turnaround on the cleanups — const docs follow the Go convention, the duplicate enum marker is gone, and both tests use a local ctx. The appProtocol design landed cleanly, with the XValidation overflow guard and purpose-named filter-reply port. Looks good to merge.

Re: the makeReconcilerWithFakeClient overlap with #65 — deferring the conflict cleanup to whichever PR rebases after the other merges works for us. Thanks for the heads-up.

@shingonoide shingonoide left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution, @nikitos. The appProtocol field design is clean, the 65535 overflow guard is thoughtful, and the two new tests cover the happy path.

[bug] The XValidation CEL rule is attached to the wrong schema node, so the generated CRD is invalid. The marker sits on the Port int32 field (pipeline_types.go:157-160), so in the generated CRD it lands under the integer port property (config/crd/bases/filter.plainsight.ai_pipelines.yaml:388-398, and the deployment/.../crds copy). At that node self is the scalar integer port value, so self.appProtocol is a field selection on a scalar, which has no such field. Kubernetes type-checks CEL rules when the CRD is applied, so this rule would most likely fail to compile and the CRD would be rejected at apply time (HTTP 422), rather than installing and silently admitting bad values. Move the marker to the struct level on ServicePort (at struct scope the has() guard is unnecessary since appProtocol defaults to tcp), and regenerate both CRDs.

[nit] AppProtocolTCP is defined but unused (pipeline_types.go:141).

[suggestion] Add a test for appProtocol: openfilter with TargetPort == nil (streaming.go:608-610 fallback is unexercised).

[suggestion] Reply port name is not index-qualified: Service object name is <instance>-<filter>-<idx> (streaming.go:603) but port names are filterName / filterName-reply (streaming.go:635,647). Appending the index keeps them unambiguous if a filter has two entries.

[info] Merge-order dependency with #65: both PRs add an identical makeReconcilerWithFakeClient to the same test file (absent on main); whichever merges second fails to build until the duplicate is removed.

@nikitos

nikitos commented Jun 8, 2026

Copy link
Copy Markdown
Author

i fixed tests and CRD specs

@nikitos

nikitos commented Jun 8, 2026

Copy link
Copy Markdown
Author

[suggestion] Reply port name is not index-qualified: Service object name is <instance>-<filter>-<idx> (streaming.go:603) but port names are filterName / filterName-reply (streaming.go:635,647). Appending the
index keeps them unambiguous if a filter has two entries.

port name should be unique in service so what for append index to it? often port have names 'metrics', 'http' and so on, and can have same name in different services

@shingonoide shingonoide left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the follow-up, @nikitos. Both main points from the earlier review are now addressed.

Fixed in bb2dd6ab:

  • The XValidation marker has been moved to the ServicePort struct level, so both generated CRDs now place x-kubernetes-validations under the object node instead of the integer port property. Since appProtocol defaults to tcp and port is required, the rule self.appProtocol != 'openfilter' || self.port <= 65534 always resolves without a has() guard, and the CRD type-checks at apply time. Both CRD copies match.
  • The nil-targetPort test (TestEnsureFilterServices_OpenFilterWithoutExplicitTargetPort) is in place and passes, covering the fallback where targetPort defaults to port and the reply channel to port + 1.

Please clean up before merge:

  • Remove the unused AppProtocolTCP constant (pipeline_types.go:141). Nothing in the controller or tests references it; only AppProtocolOpenFilter is used, so it is dead surface in the API.

Heads-up (coordination):

  • Merge-order with #65: both PRs add an identical makeReconcilerWithFakeClient helper to pipelineinstance_controller_streaming_test.go, which is absent on main. Whichever merges second will fail to build on a duplicate declaration, so the second one will need the helper removed on rebase.

Build and all three TestEnsureFilterServices tests pass cleanly at HEAD. Nice work.

@shingonoide shingonoide changed the title expose two ports for filter feat(filter): expose openfilter reply port via appProtocol on ServicePort Jun 26, 2026

@shingonoide shingonoide left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nikitos. The appProtocol approach is clean and the tests cover the data and reply paths well.

One thing is keeping the lint check red: the goconst rule flags the literal "filter" used three times in pipelineinstance_controller_streaming_test.go (around line 981). Please hoist it into a const (for example const filterName = "filter") and use it in those assertions; that clears the check, and everything else already passes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants