feat: multi-source pipeline instances — streaming + batch (PLAT-1141)#74
Conversation
Adds the per-filter source-binding shape so a single PipelineInstance can run a multi-camera pipeline where each VideoIn filter consumes a distinct PipelineSource. Streaming mode is fully supported; batch mode keeps single-source (multi-source batch is tracked as a focused follow-up that needs the N-init-claimer rework). CRD changes (api/v1alpha1/pipelineinstance_types.go) - Spec.SourceRef switches from value to *SourceReference and is marked deprecated. Existing single-source CRs continue to apply unchanged. - New Spec.Sources []NamedSourceRef binds each PipelineSource to a specific filter container in the referenced Pipeline by name. NamedSourceRef.FilterName is constrained to DNS-1123 label rules so the value can flow safely into generated container/volume/env names. - listType=map + listMapKey=filterName makes Sources stable under server-side apply. - New EffectiveSources() method on PipelineInstanceSpec normalizes both shapes into a uniform []NamedSourceRef. Legacy SourceRef becomes a one-entry list with FilterName="" — the broadcast sentinel the reconciler uses to preserve today's all-containers behavior. Reconciler changes - New ResolvedSourceBinding pairs the bound filter name with the fetched PipelineSource. resolveSourceBindings (replaces getPipelineSource) builds the list once at the top of Reconcile and threads it through to reconcileStreaming / reconcileBatch. - Streaming (pipelineinstance_controller_streaming.go): buildStreamingDeployment now splits bindings into a per-filter map and a broadcast slice, then injects RTSP_URL / _RTSP_USERNAME / _RTSP_PASSWORD into only the matching container plus any broadcast targets. The downstream container (webvis, etc.) gets no source env vars and keeps using its filter.config `sources: tcp://localhost:...` wiring exactly as before. - Idle-timeout policy anchors to bindings[0].Source for V1; an explicit comment flags that "ANY vs ALL idle" semantics for multi-source streams is a follow-up decision. - Batch (pipelineinstance_controller_batch.go): rejects multi-source with a clear MultiSourceBatchUnsupported condition; single-source batch flows through unchanged via bindings[0].Source. Multi-source batch needs the N-init-claimer reshape; tracked separately. Tests - pipelineinstance_multisource_test.go: three new test cases pin the contract — per-container RTSP_URL on the multi-source path, broadcast preserved on the legacy SourceRef path, and EffectiveSources normalization. - 39 existing test sites updated for *SourceReference (was struct, now pointer) and the new reconcileBatch/Streaming / ensureStreamingDeployment signatures. - All controller tests pass; controller package coverage 60.7% → 60.9%. Generated artifacts (regenerated, not hand-edited) - api/v1alpha1/zz_generated.deepcopy.go - config/crd/bases/filter.plainsight.ai_pipelineinstances.yaml - deployment/openfilter-pipelines-controller/crds/filter.plainsight.ai_pipelineinstances.yaml Sample - config/samples/pipelines_v1alpha1_pipelineinstance_stream_multisource.yaml demonstrates a two-camera binding referencing the front_cam / back_cam filter names. Wired into the kustomization.yaml index.
…source Validated end-to-end against a docker-desktop cluster with a real RTSP stream: two videoin filters bound to distinct PipelineSources successfully co-tenant in one Pod and both stream at 24fps. The first smoke attempt crashed back-cam with `ZMQError: Address already in use (addr='tcp://*:5551')` because front-cam's `outputs: tcp://*:5550` reserves both 5550 and 5551 — openfilter binds the configured port AND port+1 for its ZMQ control channel (https://docs.plainsight.ai/docs/openfilter/your-first-filter/: "There is one idiosyncrasy with TCP connections to keep in mind, TWO ports are used, the port number you specify and that port number + 1"). Spacing outputs by 2 (5550, 5552, …) fixed the collision. Adding the warning to the multi-source sample so anyone copying it as a template gets the spacing right on the first try. Controller-side behavior is unaffected — port selection lives in the user-authored `pipeline.spec.filters[].config` strings; the controller just plumbs them through verbatim.
Second openfilter authoring constraint surfaced while empirically validating PR2 against two real RTSP streams (vod2stream-car-truck-person and vod2stream): every VideoIn must tag its output topic via the `;topic` suffix in its `sources` config, otherwise both VideoIns publish to the default topic 'main' and webvis (or any downstream fan-in filter) crashes with: RuntimeError: duplicate topic 'main' from: VideoIn-… @ tcp://… Fix is the `sources: "$(RTSP_URL);<filter-name>"` form on each VideoIn plus matching `tcp://…;<topic>` references in the downstream filter's sources. Sample comment now covers both authoring constraints (port spacing + distinct topics) with a complete worked example. Validated end-to-end: two distinct streams streaming concurrently in one Pod, webvis /data endpoint returns the multi-topic shape with each entry pinned to its own `meta.src` URI. Controller code unchanged — both constraints live in the pipeline graph that the user (or plainsight-api export) authors.
Two small follow-ups to the PR2 main commit, surfaced while running the docker-desktop smoke tests against real RTSP streams. CRD admission rule (`api/v1alpha1/pipelineinstance_types.go`) +kubebuilder:validation:XValidation:rule="(has(self.sourceRef) && !has(self.sources)) || (!has(self.sourceRef) && has(self.sources) && size(self.sources) > 0)",message="exactly one of `sourceRef` or `sources` must be set (sources must be non-empty)" surfaces the contract violation at apply time instead of silently letting `EffectiveSources()` pick `Sources` (when both are set) or returning nil (when neither is set). Verified live against docker-desktop: a CR with neither field is rejected, a CR with both is rejected, a CR with only `sources` is accepted. DESIGN_DOCUMENT.md - 3.3 PipelineInstance: documents both `sourceRef` (legacy) and `sources` (multi-source) shapes with the admission constraint. - New 5.4 "Multi-source contract" — covers EffectiveSources normalization, the per-container env injection rule, and the two authoring constraints (port spacing + per-VideoIn topic tags) that the smoke tests proved out empirically. - New 5.5 "Multi-source batch — V1 status" — documents the `MultiSourceBatchUnsupported` condition the batch reconciler surfaces today. Generated CRDs regenerated via `make manifests generate helm-update-crds`. make test + make lint both green; coverage unchanged at 60.9%.
Closes the multi-source batch gap that PR2 originally left as a follow-up.
Multi-camera benchmark runs from plainsight-api (PLAT-1140) now have an
end-to-end controller path; the rejection condition is gone.
Behavior
Batch mode now branches on `len(Sources)`:
- Single-source (legacy SourceRef or one-entry Sources): unchanged.
The existing queue-based parallel-files-per-bucket flow keeps
working bit-for-bit. List bucket → enqueue → N pods pop one file
each via Valkey → init claimer downloads → filter chain runs.
- Multi-source (≥2 entries in Sources): `reconcileBatchMultiSource`
emits a single Pod (parallelism=1, completions=1) with one init
claimer per binding in **direct mode** plus the user's filter
containers. Per-VideoIn `VIDEO_INPUT_PATH` is injected into the
matching container only; downstream containers are untouched.
Completion is the Job's own Succeeded/Failed status — no
per-file work-queue accounting.
CRD
`BucketSource.Object` (optional) names a specific S3 object key. Set
this for every binding in multi-source batch so each direct-mode
claimer has a deterministic target; the legacy queue path stays
prefix-based and ignores the field. Two new Degraded conditions
surface configuration errors:
- `MultiSourceBatchInvalidSource`: a binding's PipelineSource isn't
a Bucket (RTSP makes no sense for batch).
- `MultiSourceBatchMissingObject`: a binding's Bucket.Object is
unset.
Claimer
`cmd/claimer/main.go` learns a direct mode triggered by the new
`S3_OBJECT_KEY` env var: skip Valkey entirely, download exactly that
key to `VIDEO_INPUT_PATH`, exit. Legacy queue mode is unchanged when
the env is empty. `STREAM` and `GROUP` are now required only in queue
mode.
Reconciler
`reconcileBatch` lost the `MultiSourceBatchUnsupported` rejection and
gained a one-line branch into the new path. The single-source code
path is otherwise untouched.
New file `pipelineinstance_controller_batch_multisource.go` carries:
- `reconcileBatchMultiSource` (validation, Job ensure, status sync)
- `buildMultiSourceBatchJob` (Job builder)
- `buildDirectClaimerEnv` (per-binding direct-mode env)
- `buildBatchFilterContainersForMultiSource` (per-container env injection)
- `perFilterInputPath` (download path naming)
Tests
`TestBuildMultiSourceBatchJob_PerBindingInitClaimersAndEnv` pins:
- One init claimer per binding, named `claimer-<filterName>`
- Each claimer's S3_OBJECT_KEY + VIDEO_INPUT_PATH match its binding
- No Valkey env on direct-mode claimers
- Each VideoIn filter container has VIDEO_INPUT_PATH set; downstream
filters do not
- Job topology: parallelism=1, completions=1
Make test + make lint both green.
Validated empirically on docker-desktop K8s with a fake S3 bucket
(claimers can't actually download but the controller-side shape is
the validation target): 2 init claimers named correctly, each with
its specific S3_OBJECT_KEY + path, per-VideoIn env on both
VideoIn filter containers, downstream `image-out` container left
clean.
DESIGN_DOCUMENT.md sections 3.3 and 5.5 updated to reflect that batch
multi-source is now supported and documents the
`MultiSourceBatchInvalidSource` / `MultiSourceBatchMissingObject`
operator-actionable failure modes.
Adds tests for the parts of the PR that were happy-path-only after the
PLAT-1141 work. Coverage gains:
- reconcileBatchMultiSource: 0% → 73.9%
- resolveSourceBindings: 80% → 86.7%
- cmd/claimer/loadConfig: 68.8% → 87.5%
Controller package overall: 58.6% → 63.0%. Claimer package: 17.6% → 19.1%.
New test files / functions
internal/controller/pipelineinstance_controller_batch_multisource_test.go
Pins the six branches of reconcileBatchMultiSource against a fake
client with status-subresource support:
1. RejectsNonBucketSource → MultiSourceBatchInvalidSource
2. RejectsMissingObject → MultiSourceBatchMissingObject
3. CreatesJobAndStampsStartTime → happy-path Job create
4. JobCompleteMarksSucceeded → JobComplete → PI Succeeded
5. JobFailedMarksDegraded → JobFailed → PI Degraded
(Reason/Message propagate)
6. JobProgressingStaysProgressing → active Job → PI Progressing
internal/controller/pipelineinstance_multisource_test.go
Two new tests for the streaming + resolver paths:
* MultiSource_WithCredentials — credentials-bearing per-binding RTSP
source must inject _RTSP_USERNAME / _RTSP_PASSWORD as secretKeyRefs
into ONLY the matching VideoIn container; one-with, one-without
coexist; credential secret must not leak into the credential-less
sibling.
* ResolveSourceBindings_SourceMissingSurfacesActionableError +
ResolveSourceBindings_LegacySourceRefMissingSurfacesError —
operator-quality error wording for both shapes.
cmd/claimer/main_test.go
TestLoadConfig_DirectMode (table) — five sub-tests covering the
S3_OBJECT_KEY conditional STREAM/GROUP relaxation, queue-mode-still-
requires-STREAM, queue-mode-still-requires-GROUP, S3_BUCKET still
required in either mode, and VIDEO_INPUT_PATH default in direct mode.
Known gap: cmd/claimer/main.go:run() direct-download branch stays at 0%
unit coverage. Testing it would require mocking the minio client or
spinning up a fake S3 server inside the test process; the function is
small delegation (loadConfig → createMinIOClient → downloadFile) and is
validated end-to-end by the docker-desktop smoke test that proved the
init claimers receive the correct env vars and execute.
make test + make lint both green.
Empirical validation against the real multi-camera filter chainReviewer asked about validation against the premium multi-camera filters. Tested Pipeline shape: Two distinct RTSP test streams bound via the new Result: 5/5 containers Running, both cameras producing distinct content-appropriate detections. webvis
The traffic video gets vehicle labels; the people video gets person labels. That's proof — not inference — that each What this rules out: binding swaps, broadcast leaks, cross-camera state contamination — all of which would show up as identical detection sets on both topics or identical Caveat (called out for transparency): CPU inference latency was ~100s/frame on docker-desktop. That's the price of running rt-detr without GPU; the controller's responsibility (per-container env injection, downstream wiring through openfilter's ZMQ topology) is unaffected. Real deployments on a GPU cluster get real-time inference. Adding the downstream |
…PATH contract Three fixes from the cross-repo contract review: - Stream multisource sample used `filterName: front_cam` / `back_cam` — rejected at admission by the CRD's own DNS-1123 pattern, and mismatching the comment's `front-cam` / `back-cam` Pipeline filters. All names (filterName, filter names, topics) now align hyphenated so there is one name per camera end-to-end, matching the sample's stated convention. - Single-source batch now injects VIDEO_INPUT_PATH (= the claimer's download destination, spec.videoInputPath) into every filter container, mirroring the per-binding injection the multi-source path already does. This lets exported Pipelines author VideoIn sources as `file://$(VIDEO_INPUT_PATH)` uniformly — the plainsight-api export (PLAT-1140) switches to that form so the same exported YAML serves 1..N sources. Existing CRDs with hardcoded /ws/input.mp4 keep working (env injection is additive); injected before filter.Env so a user-supplied value still wins. - Added the batch multisource sample the design doc calls for (direct-mode Job, per-binding claimers, Bucket.Object constraints, $(VIDEO_INPUT_PATH) authoring) and registered both multisource samples in kustomization.
Two fixes from the cross-repo PLAT-1071 review: 1. VIDEO_INPUT_PATH now precedes the FILTER_* config env in BOTH batch builders. Kubernetes dependent-env expansion only resolves $(VAR) references to variables defined earlier in the list — with the old config-first ordering, the `sources: file://$(VIDEO_INPUT_PATH)` authoring contract arrived in the container as a literal unexpanded string and multi-source batch was dead on arrival (the streaming builder already ordered RTSP env first and documents why). Ordering assertions added to the multisource unit test and the single-source envtest — the prior tests only asserted presence/value, which is how this slipped through. 2. Direct-mode claimers resolve their object key as Bucket.Object when set, else Bucket.Prefix used as the full key (new bindingObjectKey). The platform's media model has no separate object concept — the agent has always shipped single-file object keys in `prefix`, and the legacy queue path relied on prefix-as-key listing. The previous hard requirement on Bucket.Object (a field this PR itself introduced) would have Degraded every agent-deployed multi-source batch (MultiSourceBatchMissingObject) since the agent never populates it. Degrade now only when neither is set. Relatedly, the queue seeder honors an explicit Object (enqueue exactly that key, no prefix scan) so a hand-authored single-binding CR with Object doesn't silently process the whole prefix.
The Object field was invented by this PR and never had a producer: the platform's media model carries single-file object keys in `prefix` (the agent parses the media URL path into it) and the legacy queue path has always relied on prefix-as-key listing. Keeping an optional field nobody sets is exactly the drift trap the cross-repo review caught (the multi-source path hard-required it while the agent never populated it). Since the field never shipped (unmerged PR), removing it is free. - bindingObjectKey resolves Bucket.Prefix as the full object key; multi-source batch Degrades (MultiSourceBatchMissingObject) only when the prefix is empty. - Queue-seeder early return removed with the field. - CRD bases + Helm chart copies regenerated in lockstep; sample and design comments updated to the prefix-as-full-key contract.
The stream multisource sample presented publisher-side ;topic tags as the only way to avoid the duplicate-topic fan-in crash. The plainsight-api export actually emits the other equivalent form — VideoIns stay on default `main`, the fan-in filter remaps per upstream (;main><name>_main) — now empirically validated end-to-end on docker-desktop (both remapped topics at full frame rate). The comment presents both forms and says which one the platform produces, so hand-authors and reviewers don't conclude exported pipelines are miswired.
… bindings, name caps Review findings 2.2-2.9: - 2.2 Degraded validation states self-heal: SetupWithManager gains a PipelineSource→PipelineInstance watch (maps source events to every referencing instance via EffectiveSources, covering sources[] and the legacy singular ref), and clearDegradedReason (now variadic) flips Degraded back to False once multi-source batch validation passes — previously a fixed PipelineSource never re-triggered reconcile and instances reported Degraded=True + Succeeded=True forever. - 2.3 A sources[].filterName matching no pipeline.spec.filters[].name now Degrades with SourceBindingUnmatched (message lists unmatched + available names) in ALL modes — was a silent no-op leaving the mistargeted VideoIn inputless while the instance reported Running. Cleared automatically once bindings validate. - 2.5 claimer-<filterName> container names cap at 63 chars (deterministic 55-char filterName budget, trailing-hyphen trim) — a CRD-valid 56+ char filterName previously produced an apiserver- rejected Job and an error-requeue loop. - 2.6 Stale comment promising uninjected env fixed. - 2.7/2.8 DESIGN_DOCUMENT: §5.4 no longer claims batch multi-source is rejected (contradicting §5.5); documents the unmatched-binding validation, the idle-timeout sourceBindings[0] anchoring caveat, and the prefix-as-key contract (stale Bucket.Object reference removed). - 2.9 Multisource unit tests use CRD-valid hyphenated filter names. New tests: Degraded-clears-on-recovery reconcile cycle, claimer name truncation, watch mapping fn, unmatched-binding Degradation in both modes + clear-on-fix.
- envtest specs for the PipelineInstance admission rules the agent and portal PRs build on: the sourceRef/sources CEL xor (both, neither, empty list), listMapKey uniqueness on sources[].filterName, and the DNS-1123 filterName pattern (16 specs against a real API server). - table test for perFilterInputPath (claimer↔filter path contract: extension preservation, .mp4 default, edge cases). - drop the Deprecated marker on spec.sourceRef: it is the first-class convenience form for single-source pipelines (broadcast semantics, no filter names needed), supported indefinitely alongside sources[]. Removes the now-stale SA1019 lint suppressions; CRDs regenerated and Helm copies synced.
…ces resolve Found live in the cross-PR smoke test: the deployment agent applies the PipelineInstance before its PipelineSources (intentional — TOCTOU fix), so the first reconcile sets Degraded/PipelineSourceNotFound. The PipelineSource watch re-reconciled and built the deployment, but the stale condition was never cleared (clear-on-pass only covered SourceBindingUnmatched) — and the agent's provisioning monitor tears the instance down on any lingering Degraded. Add the reason to the existing variadic clear and pin the two-phase race in a regression test.
There was a problem hiding this comment.
Pull request overview
This PR extends the controller to support multi-source PipelineInstances (streaming + batch) by introducing per-filter source bindings while preserving the legacy single-source convenience form.
Changes:
- Adds
spec.sources[](list-map keyed byfilterName) plusEffectiveSources()normalization, and updates CRD admission (CEL xor rule) to enforce exactly-one-ofsourceRef/sources. - Implements controller-side multi-source reconciliation: per-container RTSP env injection for streaming, and a new single-pod, multi-init-claimer “direct download” Job path for multi-source batch.
- Adds a new claimer “direct mode” (
S3_OBJECT_KEY) and broad test/docs/sample updates to pin the contract.
Reviewed changes
Copilot reviewed 20 out of 21 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/controller/pod_labels_test.go | Updates streaming deployment builder call signature for resolved source bindings. |
| internal/controller/pipelineinstance_streaming_update_test.go | Updates streaming update tests for SourceRef pointer + bindings slice. |
| internal/controller/pipelineinstance_multisource_test.go | Adds comprehensive multi-source unit tests (streaming env routing, batch job shape, binding validation, watch mapping, etc.). |
| internal/controller/pipelineinstance_controller.go | Introduces ResolvedSourceBinding, binding resolution, unmatched-binding validation, Degraded clearing improvements, and a PipelineSource watch+mapper. |
| internal/controller/pipelineinstance_controller_test.go | Updates controller integration tests for SourceRef pointer and validates VIDEO_INPUT_PATH ordering. |
| internal/controller/pipelineinstance_controller_streaming.go | Refactors streaming path to accept resolved bindings and inject RTSP env per-container; anchors idle-timeout to first binding. |
| internal/controller/pipelineinstance_controller_reconcile_span_test.go | Updates reconcile-span tests to pass bindings slice and pointer SourceRef. |
| internal/controller/pipelineinstance_controller_batch.go | Routes multi-source batch to new reconciler; fixes env ordering by injecting VIDEO_INPUT_PATH before FILTER_*. |
| internal/controller/pipelineinstance_controller_batch_multisource.go | Adds new multi-source batch reconciler + job builder (N init claimers, direct mode, per-VideoIn VIDEO_INPUT_PATH). |
| internal/controller/pipelineinstance_controller_batch_multisource_test.go | Adds state-machine coverage for multi-source batch reconciler and helper behavior (naming, paths). |
| internal/controller/pipelineinstance_admission_test.go | Adds envtest coverage pinning CRD admission validation (xor rule, listMapKey uniqueness, filterName pattern). |
| DESIGN_DOCUMENT.md | Documents the new multi-source contract (streaming + batch constraints and behavior). |
| deployment/openfilter-pipelines-controller/crds/filter.plainsight.ai_pipelineinstances.yaml | Updates rendered CRD schema/docs: sources, sourceRef description, CEL xor validation. |
| config/samples/pipelines_v1alpha1_pipelineinstance_stream_multisource.yaml | Adds sample for multi-source streaming authoring + constraints. |
| config/samples/pipelines_v1alpha1_pipelineinstance_batch_multisource.yaml | Adds sample for multi-source batch direct-mode behavior + constraints. |
| config/samples/kustomization.yaml | Includes new sample manifests. |
| config/crd/bases/filter.plainsight.ai_pipelineinstances.yaml | Updates CRD base with sources and CEL xor validation. |
| cmd/claimer/main.go | Adds direct-download mode via S3_OBJECT_KEY and relaxes STREAM/GROUP requirements in that mode. |
| cmd/claimer/main_test.go | Adds tests covering direct-mode config loading and validation behavior. |
| api/v1alpha1/zz_generated.deepcopy.go | Updates deep-copies for new types and SourceRef pointer + Sources slice. |
| api/v1alpha1/pipelineinstance_types.go | Adds sources, makes sourceRef optional pointer, adds NamedSourceRef and EffectiveSources(), and updates validation/docs. |
Files not reviewed (1)
- api/v1alpha1/zz_generated.deepcopy.go: Generated file
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
leandrobmarinho
left a comment
There was a problem hiding this comment.
Reviewed the controller-side multi-source support in depth. The binding->source mapping is correct: streaming splits bindings into a per-filter map + broadcast slice and injects each container's matching RTSP source (env ordered before FILTER_* so dependent-env expands); multi-source batch emits one claimer per binding writing /ws/ and injects VIDEO_INPUT_PATH into only the matching VideoIn; an unmatched filterName is caught up-front as Degraded, not silently dropped. Single-source backward-compat is preserved (sourceRef normalizes to a broadcast binding; CRD XValidation enforces exactly-one-of sourceRef/sources; deepcopy + the config/crd vs Helm CRD copies are in sync). Reconcile is idempotent (Get-then-Create/Patch), conditions use SetStatusCondition so no LastTransitionTime churn, and RBAC already covers pipelinesources. CI green. Looks good.
shingonoide
left a comment
There was a problem hiding this comment.
Reviewed at f54935f. This is a clean, well-tested implementation of the multi-source controller half. The condition lifecycle work stands out: the self-healing Degraded clears, the PipelineSource/Pipeline watch map-functions, and the apply-order race fix (instance applied before its sources, then stale Degraded cleared on recovery) are exactly the kind of integration corners that bite in production, and they are pinned by tests. Status-update conflict handling that requeues with fresh state rather than continuing on a mutated Conditions slice is a nice touch. CI is green.
A few non-blocking notes:
-
The ticket drafted a
FILTER_SOURCES+ deterministic-port (5550+index) injection scheme in the controller; the implementation instead injects per-containerRTSP_URL(streaming) andVIDEO_INPUT_PATH(batch) and leaves topic/port fan-in to the filtersources:config. The PR body explains and validates this pivot with the 3-camera run, so this is just a request to keep that note discoverable (a one-liner in the design doc noting the controller no longer owns the port scheme would save the next reader the cross-check). -
filterNameis constrained to DNS-1123 labels (lowercase alphanumeric + hyphen, no underscore). That is the right constraint since the name flows into container/volume/env names, but it is stricter than the original ticket text. Please confirm the deployment-agent (#65) and portal (#429) validatefilterNameidentically so they never emit a name the CRD rejects at admission. -
Minor: a single-entry
sources[]on a batch instance takes the legacy queue path and the binding'sfilterNameis not used for routing (broadcast-equivalent). This is the intended no-regression behavior for one source and the unmatched-name validation still runs, so no change needed; a one-line comment noting it would help future readers.
Nothing here blocks. Nice work on the operability hardening.
jmiller-plainsight
left a comment
There was a problem hiding this comment.
Code Review: Multi-Source Pipeline Instances (PLAT-1141)
Overall, this is an exceptionally high-quality, clean, and thoroughly tested PR. The API design, normalizations, and watchers are beautifully engineered. I have some minor suggestions regarding propagating status update errors on validation failures to ensure the degraded status is always persisted properly, and a small documentation note.
… buckets New BucketSource.singleObject field: declares the prefix is a FULL object key (set by the deployment agent for single-file media kinds). A single named binding whose bucket declares it now routes through the direct-download path — one init claimer with S3_OBJECT_KEY, no Valkey, and the extension-preserving /ws/<filterName><ext> input path — instead of queue mode, whose fixed input.mp4 destination destroys the file extension and silently breaks extension-sensitive entry filters (image-in finds zero images and exits; the Job hangs forever on the waiting downstream). Found empirically testing batch image pipelines. Backward compatible: bindings without the flag (hand-authored CRs, fileset prefixes) keep the queue flow; the legacy broadcast sentinel (singular sourceRef) is never routed direct. CRDs regenerated, Helm copies synced.
Zero-byte keys ending in "/" (GCS/S3 console "folders") were enqueued as work items in queue mode; the placeholder then failed its download/decode attempts, landed in the DLQ, and failed the whole run. Observed empirically: an external_videos fileset over a console-created GCS folder processed both real videos (2/3 completions) and then failed the run on the "videos/" placeholder. Skip them at listing time; zero-byte regular files and non-empty trailing-slash keys are kept.
…ropagation, cross-ns watch Review feedback on #74: - Validation failures (unmatched filterName, multi-source batch invalid source / missing object key) now set Progressing=False alongside Degraded so the condition set reads consistently for automation; Succeeded is left untouched (it records a past terminal outcome). - Status-update errors on those validation paths are propagated (conflict → requeue, else return the error) instead of being logged and swallowed — these paths intentionally don't requeue, so a swallowed write failure left the server-side status stale forever. Batch validation degrade centralized in degradeBatchValidation. - pipelineInstancesForPipelineSource lists cluster-wide: a cross-namespace sourceRef (ref.namespace set) now wakes its instance on source events exactly like a same-namespace one; the per-ref namespace resolution already handled correctness. - Steady-state batch reconcile (30s loop) skips the no-op status write when the Progressing condition is unchanged. - Docs: DESIGN_DOCUMENT notes the port/topic ownership pivot (authoring-time, not controller-owned); reconcileBatch documents that the queue path's filterName is broadcast-equivalent.
lucasmundim
left a comment
There was a problem hiding this comment.
Thanks for the thorough pass — all three notes addressed or answered:
1. Port-scheme ownership note: added to DESIGN_DOCUMENT.md in c7310c9 — a paragraph under the authoring-constraints section records the pivot from the ticket's controller-injected FILTER_SOURCES/port scheme to authoring-time wiring (API export or hand-authored), with the controller owning only per-binding env injection.
2. filterName validation alignment across repos: confirmed identical end-to-end. The API validates instanceName (which becomes filterName) against the same DNS-1123 regex fail-closed at instance create (validation.InstanceNameRegex, plainsight-api#861), the portal enforces it in the editor with the same pattern (INSTANCE_NAME_PATTERN, client-portal#429), and the agent passes through only API-validated names — the CRD pattern is the backstop, and the admission envtest in this PR pins it.
3. Single-entry sources[] comment: added on reconcileBatch in c7310c9 (same as jmiller's note). Also worth flagging since your review snapshot at f54935f: single named bindings with singleObject: true buckets now take the direct path (a5697a5) — the queue path and its broadcast-equivalence remain for everything else.
shingonoide
left a comment
There was a problem hiding this comment.
Reviewed the multi-source CRD plus both reconcilers against PLAT-1141. The CEL xor on sourceRef / sources correctly rejects both-set and neither-set (including the empty sources: [] case via the explicit size() > 0), EffectiveSources() normalizes both forms into one path, and the generated deepcopy deep-copies the new slice rather than aliasing it. The per-binding env injection is ordered before FILTER_* on every builder so the $(RTSP_URL) / $(VIDEO_INPUT_PATH) expansion resolves, and the batch direct-download path preserves the file extension (the fix for the image-in case). The PipelineSource to PipelineInstance watch plus the Degraded self-heal closes the PI-before-sources creation race. CI green, including CRD-in-sync and chart parity.
Two non-blocking test-coverage gaps worth a follow-up:
- The GPU coexistence with PR #66 is wired on the streaming and batch multi-source paths via the shared
applyGPUContainerSharing, but no test assertsnvidia.com/gpu/NVIDIA_VISIBLE_DEVICESplacement on aSpec.Sourcesinstance (the 3-cams + 1-GPU-filter topology the ticket calls out). - The streaming
RTSP_URL-before-FILTER_*ordering is correct in code but only the batchVIDEO_INPUT_PATHordering is asserted by a test (assertEnvPrecedesFilterConfig). A streaming-side assertion would guard against future reordering regressions.
Approving. Reminder for the rollout: this needs a v* release (or a pinned CLAIMER_IMAGE) post-merge since claimer:latest predates direct mode, as the ticket notes.
….Object references Review (Copilot round 2): - The V1 idle-timeout policy anchored to sourceBindings[0], but spec.sources is listType=map — element order is not semantically stable across patches. Anchor to the lexicographically-smallest filterName instead (idleAnchorBinding, table-tested), and log which binding anchored the timeout. - DESIGN_DOCUMENT.md and a test comment still referenced the removed Bucket.Object field — corrected to the prefix-is-full-object-key contract.
…ring assertions Review (shingonoide): two coverage gaps closed — - TestBuildStreamingDeployment_MultiSource_GPUCoexistence pins the 3-cams + 1-GPU-filter topology on a Spec.Sources instance: exactly one container holds the nvidia.com/gpu limit, the GPU filter carries NVIDIA_VISIBLE_DEVICES (and no RTSP_URL), the camera containers carry their per-binding RTSP_URL and no GPU env. - TestBuildStreamingDeployment_RTSPURLPrecedesFilterConfig asserts the streaming-side RTSP_URL-before-FILTER_* ordering via the same assertEnvPrecedesFilterConfig helper that already guards the batch VIDEO_INPUT_PATH ordering.
lucasmundim
left a comment
There was a problem hiding this comment.
Thanks — both coverage gaps closed in the latest push:
1. GPU coexistence on multi-source: TestBuildStreamingDeployment_MultiSource_GPUCoexistence pins the exact 3-cams + 1-GPU-filter topology on a Spec.Sources instance — exactly one container holds the nvidia.com/gpu limit, the GPU filter carries NVIDIA_VISIBLE_DEVICES (and no RTSP_URL), and each camera container carries its own per-binding RTSP_URL with no GPU env.
2. Streaming env ordering: TestBuildStreamingDeployment_RTSPURLPrecedesFilterConfig asserts RTSP_URL precedes every FILTER_* entry via the same assertEnvPrecedesFilterConfig helper that already guards the batch VIDEO_INPUT_PATH ordering.
And yes on the rollout reminder — the claimer release requirement is recorded in the PR's deploy note, the plan doc's merge-order section, and PLAT-1141's as-built notes, so it can't get lost between merge and deploy.
Closes PLAT-1141, the controller half of PLAT-1071.
Summary
PipelineInstance.Spec.Sources []NamedSourceRefso a single PipelineInstance can bind N PipelineSources to N specific filter containers — the controller injects source-derived env vars into only the matching container, enabling multi-camera pipelines in one Pod.Spec.SourceRef(now*SourceReference) as a first-class convenience form for single-source pipelines — broadcast semantics, no filter names needed, supported indefinitely (un-deprecated during review). An admission-time CEL rule enforces exactly-one-of;EffectiveSources()normalizes both shapes into one reconciler path.RTSP_URL(and_RTSP_USERNAME/_RTSP_PASSWORDwhen present) injected into the matchingvideoin-<filterName>container.Bucket.Prefixis treated as the full object key (how single-file medias ship from the platform — no separateObjectfield; an earlier draft added one, removed before merge since nothing produces it); the reconciler emits one Pod with N init claimer containers in direct mode (skip Valkey, download exactly that key to/ws/<filterName><ext>) and per-VideoInVIDEO_INPUT_PATHinjection — ordered before the filter's config env so K8s dependent-env expansion resolvesfile://$(VIDEO_INPUT_PATH). Single-source batch keeps today's queue-based parallel-files-per-bucket flow bit-for-bit.BucketSource.singleObjectfield — when a single named binding's bucket declares its prefix is a full object key (the agent sets this for single-file media kinds), the instance runs in the same direct-download path as multi-source: one claimer withS3_OBJECT_KEY, extension-preserving/ws/<filterName><ext>input path, no Valkey. Fixes single-source batch image pipelines (queue mode's fixedinput.mp4destination destroyed the extension;image-insilently found zero images and the Job hung forever — found empirically). Hand-authored CRs without the flag and fileset prefixes keep queue mode; the legacy broadcastsourceRefis never routed direct./) are skipped at listing time — previously one was enqueued as a queue-mode work item, failed its download/decode attempts, hit the DLQ, and failed the whole run (observed empirically on anexternal_videosfileset over a console-created GCS folder).sourceRefs wake their instances too), so an instance applied before its sources self-heals; unmatched bindingfilterNames surface asDegraded/SourceBindingUnmatchedand clear automatically once bindings validate; batch validation failures surface asDegraded/MultiSourceBatchInvalidSource/MultiSourceBatchMissingObjectwith operator-actionable messages, also cleared on recovery. Validation failures additionally setProgressing=False(consistent condition set for automation) and propagate status-write errors (conflict → requeue, else retry via returned error) instead of swallowing them on non-requeuing paths; the steady-state batch loop skips no-op status writes; claimer container names truncate safely at 63 chars (no trailing hyphen).Validation
Unit tests
make testgreen;make lint0 issues.pipelineinstance_multisource_test.gopins both paths: streaming per-containerRTSP_URLrouting, singular-form broadcast preserved,EffectiveSources()normalization, multi-source batch per-binding direct-mode claimer + per-VideoIn env injection.pipelineinstance_admission_test.go, 16 specs against a real API server): thesourceRef/sourcesCEL xor (both, neither, empty list),listMapKeyuniqueness onsources[].filterName(duplicate filter rejected; two filters sharing one source accepted), and the DNS-1123filterNamepattern (9 accept/reject cases) — pinning the admission contract the agent (plainsight-deployment-agent#65) and portal (client-portal#429) build on.TestPerFilterInputPathpins the claimer↔filter path contract (extension preservation,.mp4default, edge cases).End-to-end on docker-desktop K8s
vod2stream-car-truck-person+vod2stream): 3/3 containers Running, 0 restarts; front-cam @ 24.0 fps, back-cam @ 23.976 fps; webvis/datareturns the multi-topic JSON shape ({front_cam: {meta: {src: rtsp://A}}, back_cam: {meta: {src: rtsp://B}}}).filter-aggregatoriteratingframes.items()): consumed both topics cleanly ({frame_count: 78, sources: 2}).front_cam_main/back_cam_main) at full frame rate — validates the API↔controller contract end-to-end.Sources[]: binding resolved, reconciler proceeds to bucket listing (fails on fake bucket as designed).claimer-front-camandclaimer-back-cam, each carrying its specificS3_OBJECT_KEY(front.mp4/back.mp4) +VIDEO_INPUT_PATH(/ws/front-cam.mp4//ws/back-cam.mp4), no Valkey env on either;front-camandback-camfilter containers each gotVIDEO_INPUT_PATH; downstreamimage-outcontainer has none; Job topologyparallelism=1, completions=1.Post-change verification (2026-06-12, all four PRs live, fresh CRDs/agent/claimer): single-source image → direct mode, Job 1/1 Complete in 11s (
/ws/image-in.png, extension preserved — previously hung forever); single-source video → direct mode (Added direct-mode claimer … bindings: 1in the log), completed; multi-source video (2 files, fan-in) completed in 42s;external_videosfileset → queue mode engaged (totalFiles: 2after the placeholder fix), 2/2 completions,completed; 2-camera stream regression: 3/3 Running, both topics live.Cross-PR smoke (2026-06-11, all four train PRs live: API #861 → agent #65 → this controller, agent-authored CRs)
Degraded/PipelineSourceNotFound; the PipelineSource watch then self-healed the deployment but the stale condition was never cleared, and the agent's provisioning monitor tore the instance down on it. Now cleared on the same pass asSourceBindingUnmatched; the two-phase race is pinned byTestReconcile_PipelineSourceNotFound_ClearsOnceSourceExists.cam_a_main/cam_b_main/cam_c_mainall live with correct per-camerameta.src); same media bound to two VideoIns (one sharedps-<media>CR, both bindings resolve to it, 3/3 Running); concurrent instances sharing a media (stopping one swept only its own sources — the survivor's shared source and pod were untouched, three separate times).Deploy note — claimer image
Direct-download mode needs the claimer built from this PR.
plainsightai/openfilter-pipelines-claimer:latest(the controller's defaultCLAIMER_IMAGE) only refreshes onv*release tags — cut a release after merging (or pinCLAIMER_IMAGEto a sha tag) before relying on multi-source or singleObject batch, or the claimer crashes withSTREAM is required(verified empirically against the currentlatest).Notes for pipeline authors (also documented in samples + DESIGN_DOCUMENT)
OpenFilter has two authoring constraints that bite every multi-source streaming pipeline. Both surfaced during the smoke test and would otherwise be silent runtime crashes:
outputs(the +1 carries ZMQ control plane). Use 5550, 5552, 5554, … — never 5550, 5551 (would crash the second VideoIn withAddress already in use).;topictags on each VideoIn'ssources(convention for hand-authoring), or subscriber-side remap;main><name>_mainat the fan-in filter (what the plainsight-api export emits — validated end-to-end). Without either, both VideoIns publish on the default topicmainand any downstream fan-in crashes withRuntimeError: duplicate topic 'main'.Neither is enforced by the controller — they live in the
filter.configstrings. The plainsight-api export (PLAT-1140, plainsight-api#861) bakes both in at export time; hand-authored CRs follow the samples.For multi-source batch: every binding's PipelineSource must be a
Bucket(RTSP makes no sense for batch) and itsprefixmust be a full object key. The reconciler surfaces violations asMultiSourceBatchInvalidSource/MultiSourceBatchMissingObjectDegraded conditions with operator-actionable messages, and clears them once fixed.Test plan
make testgreen (including 16-spec admission envtest)make lintgreen (0 issues)Sources[]: routes through the newResolvedSourceBindingpath