Skip to content

feat: multi-source pipeline instances — streaming + batch (PLAT-1141)#74

Merged
lucasmundim merged 19 commits into
mainfrom
plat-1141-multi-source-crd-reconciler
Jun 12, 2026
Merged

feat: multi-source pipeline instances — streaming + batch (PLAT-1141)#74
lucasmundim merged 19 commits into
mainfrom
plat-1141-multi-source-crd-reconciler

Conversation

@lucasmundim

@lucasmundim lucasmundim commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Closes PLAT-1141, the controller half of PLAT-1071.

Summary

  • Adds PipelineInstance.Spec.Sources []NamedSourceRef so a single PipelineInstance can bind N PipelineSources to N specific filter containers — the controller injects source-derived env vars into only the matching container, enabling multi-camera pipelines in one Pod.
  • Keeps the singular Spec.SourceRef (now *SourceReference) as a first-class convenience form for single-source pipelines — broadcast semantics, no filter names needed, supported indefinitely (un-deprecated during review). An admission-time CEL rule enforces exactly-one-of; EffectiveSources() normalizes both shapes into one reconciler path.
  • Streaming: full multi-source. Per-binding RTSP_URL (and _RTSP_USERNAME / _RTSP_PASSWORD when present) injected into the matching videoin-<filterName> container.
  • Batch: full multi-source. Each binding's Bucket.Prefix is treated as the full object key (how single-file medias ship from the platform — no separate Object field; an earlier draft added one, removed before merge since nothing produces it); the reconciler emits one Pod with N init claimer containers in direct mode (skip Valkey, download exactly that key to /ws/<filterName><ext>) and per-VideoIn VIDEO_INPUT_PATH injection — ordered before the filter's config env so K8s dependent-env expansion resolves file://$(VIDEO_INPUT_PATH). Single-source batch keeps today's queue-based parallel-files-per-bucket flow bit-for-bit.
  • Single-binding direct mode (batch image fix): new BucketSource.singleObject field — when a single named binding's bucket declares its prefix is a full object key (the agent sets this for single-file media kinds), the instance runs in the same direct-download path as multi-source: one claimer with S3_OBJECT_KEY, extension-preserving /ws/<filterName><ext> input path, no Valkey. Fixes single-source batch image pipelines (queue mode's fixed input.mp4 destination destroyed the extension; image-in silently found zero images and the Job hung forever — found empirically). Hand-authored CRs without the flag and fileset prefixes keep queue mode; the legacy broadcast sourceRef is never routed direct.
  • Bucket-listing hygiene: zero-byte directory-placeholder objects (GCS/S3 console "folders", keys ending in /) are skipped at listing time — previously one was enqueued as a queue-mode work item, failed its download/decode attempts, hit the DLQ, and failed the whole run (observed empirically on an external_videos fileset over a console-created GCS folder).
  • Operability (review hardening): PipelineSource create/update/delete now triggers reconciliation of every PipelineInstance that references it (watch + map function, cluster-wide — cross-namespace sourceRefs wake their instances too), so an instance applied before its sources self-heals; unmatched binding filterNames surface as Degraded/SourceBindingUnmatched and clear automatically once bindings validate; batch validation failures surface as Degraded/MultiSourceBatchInvalidSource / MultiSourceBatchMissingObject with operator-actionable messages, also cleared on recovery. Validation failures additionally set Progressing=False (consistent condition set for automation) and propagate status-write errors (conflict → requeue, else retry via returned error) instead of swallowing them on non-requeuing paths; the steady-state batch loop skips no-op status writes; claimer container names truncate safely at 63 chars (no trailing hyphen).

Validation

Unit tests

  • make test green; make lint 0 issues.
  • pipelineinstance_multisource_test.go pins both paths: streaming per-container RTSP_URL routing, singular-form broadcast preserved, EffectiveSources() normalization, multi-source batch per-binding direct-mode claimer + per-VideoIn env injection.
  • CRD admission envtest (pipelineinstance_admission_test.go, 16 specs against a real API server): the sourceRef/sources CEL xor (both, neither, empty list), listMapKey uniqueness on sources[].filterName (duplicate filter rejected; two filters sharing one source accepted), and the DNS-1123 filterName pattern (9 accept/reject cases) — pinning the admission contract the agent (plainsight-deployment-agent#65) and portal (client-portal#429) build on.
  • TestPerFilterInputPath pins the claimer↔filter path contract (extension preservation, .mp4 default, edge cases).

End-to-end on docker-desktop K8s

  • Admission rule: rejects "neither" and "both" SourceRef/Sources configurations, accepts each individually (now also pinned by envtest, above).
  • Streaming with two real RTSP streams (vod2stream-car-truck-person + vod2stream): 3/3 containers Running, 0 restarts; front-cam @ 24.0 fps, back-cam @ 23.976 fps; webvis /data returns the multi-topic JSON shape ({front_cam: {meta: {src: rtsp://A}}, back_cam: {meta: {src: rtsp://B}}}).
  • Streaming with a real multi-topic filter (filter-aggregator iterating frames.items()): consumed both topics cleanly ({frame_count: 78, sources: 2}).
  • Streaming with platform-exported YAML (plainsight-api#861 export, subscriber-side topic remap): pod 3/3, both remapped topics (front_cam_main / back_cam_main) at full frame rate — validates the API↔controller contract end-to-end.
  • Batch single-source via Sources[]: binding resolved, reconciler proceeds to bucket listing (fails on fake bucket as designed).
  • Batch multi-source: Job created with 2 init claimers named claimer-front-cam and claimer-back-cam, each carrying its specific S3_OBJECT_KEY (front.mp4 / back.mp4) + VIDEO_INPUT_PATH (/ws/front-cam.mp4 / /ws/back-cam.mp4), no Valkey env on either; front-cam and back-cam filter containers each got VIDEO_INPUT_PATH; downstream image-out container has none; Job topology parallelism=1, completions=1.

Post-change verification (2026-06-12, all four PRs live, fresh CRDs/agent/claimer): single-source image → direct mode, Job 1/1 Complete in 11s (/ws/image-in.png, extension preserved — previously hung forever); single-source video → direct mode (Added direct-mode claimer … bindings: 1 in the log), completed; multi-source video (2 files, fan-in) completed in 42s; external_videos fileset → queue mode engaged (totalFiles: 2 after the placeholder fix), 2/2 completions, completed; 2-camera stream regression: 3/3 Running, both topics live.

Cross-PR smoke (2026-06-11, all four train PRs live: API #861 → agent #65 → this controller, agent-authored CRs)

  • Found and fixed a real integration bug: the agent applies the PipelineInstance before its PipelineSources (intentional — shared-source cleanup TOCTOU), so the first reconcile set Degraded/PipelineSourceNotFound; the PipelineSource watch then self-healed the deployment but the stale condition was never cleared, and the agent's provisioning monitor tore the instance down on it. Now cleared on the same pass as SourceBindingUnmatched; the two-phase race is pinned by TestReconcile_PipelineSourceNotFound_ClearsOnceSourceExists.
  • Combination matrix, all green after the fix: single source (broadcast-equivalent single binding, 2/2 Running); three sources (4/4 Running, ports 5550/5552/5554, viewer consuming cam_a_main/cam_b_main/cam_c_main all live with correct per-camera meta.src); same media bound to two VideoIns (one shared ps-<media> CR, both bindings resolve to it, 3/3 Running); concurrent instances sharing a media (stopping one swept only its own sources — the survivor's shared source and pod were untouched, three separate times).

Deploy note — claimer image

Direct-download mode needs the claimer built from this PR. plainsightai/openfilter-pipelines-claimer:latest (the controller's default CLAIMER_IMAGE) only refreshes on v* release tags — cut a release after merging (or pin CLAIMER_IMAGE to a sha tag) before relying on multi-source or singleObject batch, or the claimer crashes with STREAM is required (verified empirically against the current latest).

Notes for pipeline authors (also documented in samples + DESIGN_DOCUMENT)

OpenFilter has two authoring constraints that bite every multi-source streaming pipeline. Both surfaced during the smoke test and would otherwise be silent runtime crashes:

  1. Stagger output ports by 2. OpenFilter binds the configured port AND port+1 per outputs (the +1 carries ZMQ control plane). Use 5550, 5552, 5554, … — never 5550, 5551 (would crash the second VideoIn with Address already in use).
  2. Disambiguate fan-in topics — two equivalent schemes (both documented in the samples): publisher-side ;topic tags on each VideoIn's sources (convention for hand-authoring), or subscriber-side remap ;main><name>_main at the fan-in filter (what the plainsight-api export emits — validated end-to-end). Without either, both VideoIns publish on the default topic main and any downstream fan-in crashes with RuntimeError: duplicate topic 'main'.

Neither is enforced by the controller — they live in the filter.config strings. The plainsight-api export (PLAT-1140, plainsight-api#861) bakes both in at export time; hand-authored CRs follow the samples.

For multi-source batch: every binding's PipelineSource must be a Bucket (RTSP makes no sense for batch) and its prefix must be a full object key. The reconciler surfaces violations as MultiSourceBatchInvalidSource / MultiSourceBatchMissingObject Degraded conditions with operator-actionable messages, and clears them once fixed.

Test plan

  • make test green (including 16-spec admission envtest)
  • make lint green (0 issues)
  • CRD admission rule verified live (kubectl apply) and pinned by envtest
  • Streaming multi-source: two real RTSP streams running concurrently in one Pod, isolated per topic
  • Streaming multi-source with platform-exported YAML (API #861 contract)
  • Batch single-source via Sources[]: routes through the new ResolvedSourceBinding path
  • Batch multi-source: Job created with per-binding direct-mode claimers + per-container VIDEO_INPUT_PATH injection

Adds the per-filter source-binding shape so a single PipelineInstance can
run a multi-camera pipeline where each VideoIn filter consumes a distinct
PipelineSource. Streaming mode is fully supported; batch mode keeps
single-source (multi-source batch is tracked as a focused follow-up that
needs the N-init-claimer rework).

CRD changes (api/v1alpha1/pipelineinstance_types.go)

- Spec.SourceRef switches from value to *SourceReference and is marked
  deprecated. Existing single-source CRs continue to apply unchanged.
- New Spec.Sources []NamedSourceRef binds each PipelineSource to a
  specific filter container in the referenced Pipeline by name.
  NamedSourceRef.FilterName is constrained to DNS-1123 label rules so
  the value can flow safely into generated container/volume/env names.
- listType=map + listMapKey=filterName makes Sources stable under
  server-side apply.
- New EffectiveSources() method on PipelineInstanceSpec normalizes
  both shapes into a uniform []NamedSourceRef. Legacy SourceRef
  becomes a one-entry list with FilterName="" — the broadcast sentinel
  the reconciler uses to preserve today's all-containers behavior.

Reconciler changes

- New ResolvedSourceBinding pairs the bound filter name with the
  fetched PipelineSource. resolveSourceBindings (replaces
  getPipelineSource) builds the list once at the top of Reconcile and
  threads it through to reconcileStreaming / reconcileBatch.
- Streaming (pipelineinstance_controller_streaming.go): buildStreamingDeployment
  now splits bindings into a per-filter map and a broadcast slice, then
  injects RTSP_URL / _RTSP_USERNAME / _RTSP_PASSWORD into only the
  matching container plus any broadcast targets. The downstream
  container (webvis, etc.) gets no source env vars and keeps using
  its filter.config `sources: tcp://localhost:...` wiring exactly as
  before.
- Idle-timeout policy anchors to bindings[0].Source for V1; an
  explicit comment flags that "ANY vs ALL idle" semantics for
  multi-source streams is a follow-up decision.
- Batch (pipelineinstance_controller_batch.go): rejects multi-source
  with a clear MultiSourceBatchUnsupported condition; single-source
  batch flows through unchanged via bindings[0].Source. Multi-source
  batch needs the N-init-claimer reshape; tracked separately.

Tests

- pipelineinstance_multisource_test.go: three new test cases pin the
  contract — per-container RTSP_URL on the multi-source path,
  broadcast preserved on the legacy SourceRef path, and EffectiveSources
  normalization.
- 39 existing test sites updated for *SourceReference (was struct,
  now pointer) and the new reconcileBatch/Streaming /
  ensureStreamingDeployment signatures.
- All controller tests pass; controller package coverage 60.7% → 60.9%.

Generated artifacts (regenerated, not hand-edited)

- api/v1alpha1/zz_generated.deepcopy.go
- config/crd/bases/filter.plainsight.ai_pipelineinstances.yaml
- deployment/openfilter-pipelines-controller/crds/filter.plainsight.ai_pipelineinstances.yaml

Sample

- config/samples/pipelines_v1alpha1_pipelineinstance_stream_multisource.yaml
  demonstrates a two-camera binding referencing the front_cam /
  back_cam filter names. Wired into the kustomization.yaml index.
…source

Validated end-to-end against a docker-desktop cluster with a real RTSP
stream: two videoin filters bound to distinct PipelineSources successfully
co-tenant in one Pod and both stream at 24fps. The first smoke attempt
crashed back-cam with `ZMQError: Address already in use (addr='tcp://*:5551')`
because front-cam's `outputs: tcp://*:5550` reserves both 5550 and 5551 —
openfilter binds the configured port AND port+1 for its ZMQ control
channel (https://docs.plainsight.ai/docs/openfilter/your-first-filter/:
"There is one idiosyncrasy with TCP connections to keep in mind, TWO
ports are used, the port number you specify and that port number + 1").

Spacing outputs by 2 (5550, 5552, …) fixed the collision. Adding the
warning to the multi-source sample so anyone copying it as a template
gets the spacing right on the first try.

Controller-side behavior is unaffected — port selection lives in the
user-authored `pipeline.spec.filters[].config` strings; the controller
just plumbs them through verbatim.
Second openfilter authoring constraint surfaced while empirically
validating PR2 against two real RTSP streams (vod2stream-car-truck-person
and vod2stream): every VideoIn must tag its output topic via the
`;topic` suffix in its `sources` config, otherwise both VideoIns
publish to the default topic 'main' and webvis (or any downstream
fan-in filter) crashes with:

  RuntimeError: duplicate topic 'main' from: VideoIn-… @ tcp://…

Fix is the `sources: "$(RTSP_URL);<filter-name>"` form on each
VideoIn plus matching `tcp://…;<topic>` references in the downstream
filter's sources.

Sample comment now covers both authoring constraints (port spacing +
distinct topics) with a complete worked example. Validated end-to-end:
two distinct streams streaming concurrently in one Pod, webvis /data
endpoint returns the multi-topic shape with each entry pinned to its
own `meta.src` URI.

Controller code unchanged — both constraints live in the pipeline
graph that the user (or plainsight-api export) authors.
Two small follow-ups to the PR2 main commit, surfaced while running the
docker-desktop smoke tests against real RTSP streams.

CRD admission rule (`api/v1alpha1/pipelineinstance_types.go`)

  +kubebuilder:validation:XValidation:rule="(has(self.sourceRef) &&
   !has(self.sources)) || (!has(self.sourceRef) && has(self.sources)
   && size(self.sources) > 0)",message="exactly one of `sourceRef`
   or `sources` must be set (sources must be non-empty)"

surfaces the contract violation at apply time instead of silently
letting `EffectiveSources()` pick `Sources` (when both are set) or
returning nil (when neither is set). Verified live against
docker-desktop: a CR with neither field is rejected, a CR with both
is rejected, a CR with only `sources` is accepted.

DESIGN_DOCUMENT.md

- 3.3 PipelineInstance: documents both `sourceRef` (legacy) and
  `sources` (multi-source) shapes with the admission constraint.
- New 5.4 "Multi-source contract" — covers EffectiveSources
  normalization, the per-container env injection rule, and the two
  authoring constraints (port spacing + per-VideoIn topic tags) that
  the smoke tests proved out empirically.
- New 5.5 "Multi-source batch — V1 status" — documents the
  `MultiSourceBatchUnsupported` condition the batch reconciler
  surfaces today.

Generated CRDs regenerated via `make manifests generate helm-update-crds`.

make test + make lint both green; coverage unchanged at 60.9%.
Closes the multi-source batch gap that PR2 originally left as a follow-up.
Multi-camera benchmark runs from plainsight-api (PLAT-1140) now have an
end-to-end controller path; the rejection condition is gone.

Behavior

Batch mode now branches on `len(Sources)`:

  - Single-source (legacy SourceRef or one-entry Sources): unchanged.
    The existing queue-based parallel-files-per-bucket flow keeps
    working bit-for-bit. List bucket → enqueue → N pods pop one file
    each via Valkey → init claimer downloads → filter chain runs.

  - Multi-source (≥2 entries in Sources): `reconcileBatchMultiSource`
    emits a single Pod (parallelism=1, completions=1) with one init
    claimer per binding in **direct mode** plus the user's filter
    containers. Per-VideoIn `VIDEO_INPUT_PATH` is injected into the
    matching container only; downstream containers are untouched.
    Completion is the Job's own Succeeded/Failed status — no
    per-file work-queue accounting.

CRD

`BucketSource.Object` (optional) names a specific S3 object key. Set
this for every binding in multi-source batch so each direct-mode
claimer has a deterministic target; the legacy queue path stays
prefix-based and ignores the field. Two new Degraded conditions
surface configuration errors:

  - `MultiSourceBatchInvalidSource`: a binding's PipelineSource isn't
    a Bucket (RTSP makes no sense for batch).
  - `MultiSourceBatchMissingObject`: a binding's Bucket.Object is
    unset.

Claimer

`cmd/claimer/main.go` learns a direct mode triggered by the new
`S3_OBJECT_KEY` env var: skip Valkey entirely, download exactly that
key to `VIDEO_INPUT_PATH`, exit. Legacy queue mode is unchanged when
the env is empty. `STREAM` and `GROUP` are now required only in queue
mode.

Reconciler

`reconcileBatch` lost the `MultiSourceBatchUnsupported` rejection and
gained a one-line branch into the new path. The single-source code
path is otherwise untouched.

New file `pipelineinstance_controller_batch_multisource.go` carries:
- `reconcileBatchMultiSource` (validation, Job ensure, status sync)
- `buildMultiSourceBatchJob` (Job builder)
- `buildDirectClaimerEnv` (per-binding direct-mode env)
- `buildBatchFilterContainersForMultiSource` (per-container env injection)
- `perFilterInputPath` (download path naming)

Tests

`TestBuildMultiSourceBatchJob_PerBindingInitClaimersAndEnv` pins:
- One init claimer per binding, named `claimer-<filterName>`
- Each claimer's S3_OBJECT_KEY + VIDEO_INPUT_PATH match its binding
- No Valkey env on direct-mode claimers
- Each VideoIn filter container has VIDEO_INPUT_PATH set; downstream
  filters do not
- Job topology: parallelism=1, completions=1

Make test + make lint both green.

Validated empirically on docker-desktop K8s with a fake S3 bucket
(claimers can't actually download but the controller-side shape is
the validation target): 2 init claimers named correctly, each with
its specific S3_OBJECT_KEY + path, per-VideoIn env on both
VideoIn filter containers, downstream `image-out` container left
clean.

DESIGN_DOCUMENT.md sections 3.3 and 5.5 updated to reflect that batch
multi-source is now supported and documents the
`MultiSourceBatchInvalidSource` / `MultiSourceBatchMissingObject`
operator-actionable failure modes.
@lucasmundim lucasmundim changed the title feat: multi-source pipeline instances (PLAT-1141) feat: multi-source pipeline instances — streaming + batch (PLAT-1141) Jun 10, 2026
@lucasmundim lucasmundim self-assigned this Jun 10, 2026
Adds tests for the parts of the PR that were happy-path-only after the
PLAT-1141 work. Coverage gains:

  - reconcileBatchMultiSource:  0% → 73.9%
  - resolveSourceBindings:     80% → 86.7%
  - cmd/claimer/loadConfig:    68.8% → 87.5%

Controller package overall: 58.6% → 63.0%. Claimer package: 17.6% → 19.1%.

New test files / functions

internal/controller/pipelineinstance_controller_batch_multisource_test.go
  Pins the six branches of reconcileBatchMultiSource against a fake
  client with status-subresource support:
    1. RejectsNonBucketSource             → MultiSourceBatchInvalidSource
    2. RejectsMissingObject               → MultiSourceBatchMissingObject
    3. CreatesJobAndStampsStartTime       → happy-path Job create
    4. JobCompleteMarksSucceeded          → JobComplete → PI Succeeded
    5. JobFailedMarksDegraded             → JobFailed → PI Degraded
                                            (Reason/Message propagate)
    6. JobProgressingStaysProgressing     → active Job → PI Progressing

internal/controller/pipelineinstance_multisource_test.go
  Two new tests for the streaming + resolver paths:
    * MultiSource_WithCredentials — credentials-bearing per-binding RTSP
      source must inject _RTSP_USERNAME / _RTSP_PASSWORD as secretKeyRefs
      into ONLY the matching VideoIn container; one-with, one-without
      coexist; credential secret must not leak into the credential-less
      sibling.
    * ResolveSourceBindings_SourceMissingSurfacesActionableError +
      ResolveSourceBindings_LegacySourceRefMissingSurfacesError —
      operator-quality error wording for both shapes.

cmd/claimer/main_test.go
  TestLoadConfig_DirectMode (table) — five sub-tests covering the
  S3_OBJECT_KEY conditional STREAM/GROUP relaxation, queue-mode-still-
  requires-STREAM, queue-mode-still-requires-GROUP, S3_BUCKET still
  required in either mode, and VIDEO_INPUT_PATH default in direct mode.

Known gap: cmd/claimer/main.go:run() direct-download branch stays at 0%
unit coverage. Testing it would require mocking the minio client or
spinning up a fake S3 server inside the test process; the function is
small delegation (loadConfig → createMinIOClient → downloadFile) and is
validated end-to-end by the docker-desktop smoke test that proved the
init claimers receive the correct env vars and execute.

make test + make lint both green.
@lucasmundim

Copy link
Copy Markdown
Contributor Author

Empirical validation against the real multi-camera filter chain

Reviewer asked about validation against the premium multi-camera filters. Tested filter-rt-detr:0.1.2 (the upstream stage of the full reid + global-association multi-camera chain) end-to-end on docker-desktop K8s in CPU mode.

Pipeline shape:

front-cam (RTSP A) -> rt-detr-front (CPU) -> webvis (front_cam topic)
back-cam  (RTSP B) -> rt-detr-back  (CPU) -> webvis (back_cam topic)

Two distinct RTSP test streams bound via the new Spec.Sources shape, each rt-detr running independent CPU inference with its own FILTER_CAMERA_ID.

Result: 5/5 containers Running, both cameras producing distinct content-appropriate detections.

webvis /data aggregated output:

Topic Bound RTSP source Detections Labels found
front_cam vod2stream-car-truck-person/car-truck-person 35 bus, car, cell phone, person, truck
back_cam vod2stream/stream 12 cell phone, person

The traffic video gets vehicle labels; the people video gets person labels. That's proof — not inference — that each filter-rt-detr container received only its bound camera's stream and is keeping independent per-camera detection state (each track carried its own local_track_id).

What this rules out: binding swaps, broadcast leaks, cross-camera state contamination — all of which would show up as identical detection sets on both topics or identical src URIs in the metadata.

Caveat (called out for transparency): CPU inference latency was ~100s/frame on docker-desktop. That's the price of running rt-detr without GPU; the controller's responsibility (per-container env injection, downstream wiring through openfilter's ZMQ topology) is unaffected. Real deployments on a GPU cluster get real-time inference. Adding the downstream filter-tracking-reid and filter-global-association-multicamera-tracking stages to this validated chain is pipeline authoring — same controller plumbing, more containers.

…PATH contract

Three fixes from the cross-repo contract review:

- Stream multisource sample used `filterName: front_cam` / `back_cam` —
  rejected at admission by the CRD's own DNS-1123 pattern, and
  mismatching the comment's `front-cam` / `back-cam` Pipeline filters.
  All names (filterName, filter names, topics) now align hyphenated so
  there is one name per camera end-to-end, matching the sample's stated
  convention.

- Single-source batch now injects VIDEO_INPUT_PATH (= the claimer's
  download destination, spec.videoInputPath) into every filter
  container, mirroring the per-binding injection the multi-source path
  already does. This lets exported Pipelines author VideoIn sources as
  `file://$(VIDEO_INPUT_PATH)` uniformly — the plainsight-api export
  (PLAT-1140) switches to that form so the same exported YAML serves
  1..N sources. Existing CRDs with hardcoded /ws/input.mp4 keep working
  (env injection is additive); injected before filter.Env so a
  user-supplied value still wins.

- Added the batch multisource sample the design doc calls for
  (direct-mode Job, per-binding claimers, Bucket.Object constraints,
  $(VIDEO_INPUT_PATH) authoring) and registered both multisource
  samples in kustomization.
Two fixes from the cross-repo PLAT-1071 review:

1. VIDEO_INPUT_PATH now precedes the FILTER_* config env in BOTH batch
   builders. Kubernetes dependent-env expansion only resolves $(VAR)
   references to variables defined earlier in the list — with the old
   config-first ordering, the `sources: file://$(VIDEO_INPUT_PATH)`
   authoring contract arrived in the container as a literal unexpanded
   string and multi-source batch was dead on arrival (the streaming
   builder already ordered RTSP env first and documents why). Ordering
   assertions added to the multisource unit test and the single-source
   envtest — the prior tests only asserted presence/value, which is how
   this slipped through.

2. Direct-mode claimers resolve their object key as Bucket.Object when
   set, else Bucket.Prefix used as the full key (new bindingObjectKey).
   The platform's media model has no separate object concept — the
   agent has always shipped single-file object keys in `prefix`, and
   the legacy queue path relied on prefix-as-key listing. The previous
   hard requirement on Bucket.Object (a field this PR itself introduced)
   would have Degraded every agent-deployed multi-source batch
   (MultiSourceBatchMissingObject) since the agent never populates it.
   Degrade now only when neither is set. Relatedly, the queue seeder
   honors an explicit Object (enqueue exactly that key, no prefix scan)
   so a hand-authored single-binding CR with Object doesn't silently
   process the whole prefix.
The Object field was invented by this PR and never had a producer: the
platform's media model carries single-file object keys in `prefix` (the
agent parses the media URL path into it) and the legacy queue path has
always relied on prefix-as-key listing. Keeping an optional field nobody
sets is exactly the drift trap the cross-repo review caught (the
multi-source path hard-required it while the agent never populated it).
Since the field never shipped (unmerged PR), removing it is free.

- bindingObjectKey resolves Bucket.Prefix as the full object key;
  multi-source batch Degrades (MultiSourceBatchMissingObject) only when
  the prefix is empty.
- Queue-seeder early return removed with the field.
- CRD bases + Helm chart copies regenerated in lockstep; sample and
  design comments updated to the prefix-as-full-key contract.
The stream multisource sample presented publisher-side ;topic tags as
the only way to avoid the duplicate-topic fan-in crash. The
plainsight-api export actually emits the other equivalent form —
VideoIns stay on default `main`, the fan-in filter remaps per upstream
(;main><name>_main) — now empirically validated end-to-end on
docker-desktop (both remapped topics at full frame rate). The comment
presents both forms and says which one the platform produces, so
hand-authors and reviewers don't conclude exported pipelines are
miswired.
… bindings, name caps

Review findings 2.2-2.9:

- 2.2 Degraded validation states self-heal: SetupWithManager gains a
  PipelineSource→PipelineInstance watch (maps source events to every
  referencing instance via EffectiveSources, covering sources[] and the
  legacy singular ref), and clearDegradedReason (now variadic) flips
  Degraded back to False once multi-source batch validation passes —
  previously a fixed PipelineSource never re-triggered reconcile and
  instances reported Degraded=True + Succeeded=True forever.
- 2.3 A sources[].filterName matching no pipeline.spec.filters[].name
  now Degrades with SourceBindingUnmatched (message lists unmatched +
  available names) in ALL modes — was a silent no-op leaving the
  mistargeted VideoIn inputless while the instance reported Running.
  Cleared automatically once bindings validate.
- 2.5 claimer-<filterName> container names cap at 63 chars
  (deterministic 55-char filterName budget, trailing-hyphen trim) — a
  CRD-valid 56+ char filterName previously produced an apiserver-
  rejected Job and an error-requeue loop.
- 2.6 Stale comment promising uninjected env fixed.
- 2.7/2.8 DESIGN_DOCUMENT: §5.4 no longer claims batch multi-source is
  rejected (contradicting §5.5); documents the unmatched-binding
  validation, the idle-timeout sourceBindings[0] anchoring caveat, and
  the prefix-as-key contract (stale Bucket.Object reference removed).
- 2.9 Multisource unit tests use CRD-valid hyphenated filter names.

New tests: Degraded-clears-on-recovery reconcile cycle, claimer name
truncation, watch mapping fn, unmatched-binding Degradation in both
modes + clear-on-fix.
- envtest specs for the PipelineInstance admission rules the agent and
  portal PRs build on: the sourceRef/sources CEL xor (both, neither,
  empty list), listMapKey uniqueness on sources[].filterName, and the
  DNS-1123 filterName pattern (16 specs against a real API server).
- table test for perFilterInputPath (claimer↔filter path contract:
  extension preservation, .mp4 default, edge cases).
- drop the Deprecated marker on spec.sourceRef: it is the first-class
  convenience form for single-source pipelines (broadcast semantics, no
  filter names needed), supported indefinitely alongside sources[].
  Removes the now-stale SA1019 lint suppressions; CRDs regenerated and
  Helm copies synced.
…ces resolve

Found live in the cross-PR smoke test: the deployment agent applies the
PipelineInstance before its PipelineSources (intentional — TOCTOU fix),
so the first reconcile sets Degraded/PipelineSourceNotFound. The
PipelineSource watch re-reconciled and built the deployment, but the
stale condition was never cleared (clear-on-pass only covered
SourceBindingUnmatched) — and the agent's provisioning monitor tears
the instance down on any lingering Degraded. Add the reason to the
existing variadic clear and pin the two-phase race in a regression
test.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends the controller to support multi-source PipelineInstances (streaming + batch) by introducing per-filter source bindings while preserving the legacy single-source convenience form.

Changes:

  • Adds spec.sources[] (list-map keyed by filterName) plus EffectiveSources() normalization, and updates CRD admission (CEL xor rule) to enforce exactly-one-of sourceRef/sources.
  • Implements controller-side multi-source reconciliation: per-container RTSP env injection for streaming, and a new single-pod, multi-init-claimer “direct download” Job path for multi-source batch.
  • Adds a new claimer “direct mode” (S3_OBJECT_KEY) and broad test/docs/sample updates to pin the contract.

Reviewed changes

Copilot reviewed 20 out of 21 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
internal/controller/pod_labels_test.go Updates streaming deployment builder call signature for resolved source bindings.
internal/controller/pipelineinstance_streaming_update_test.go Updates streaming update tests for SourceRef pointer + bindings slice.
internal/controller/pipelineinstance_multisource_test.go Adds comprehensive multi-source unit tests (streaming env routing, batch job shape, binding validation, watch mapping, etc.).
internal/controller/pipelineinstance_controller.go Introduces ResolvedSourceBinding, binding resolution, unmatched-binding validation, Degraded clearing improvements, and a PipelineSource watch+mapper.
internal/controller/pipelineinstance_controller_test.go Updates controller integration tests for SourceRef pointer and validates VIDEO_INPUT_PATH ordering.
internal/controller/pipelineinstance_controller_streaming.go Refactors streaming path to accept resolved bindings and inject RTSP env per-container; anchors idle-timeout to first binding.
internal/controller/pipelineinstance_controller_reconcile_span_test.go Updates reconcile-span tests to pass bindings slice and pointer SourceRef.
internal/controller/pipelineinstance_controller_batch.go Routes multi-source batch to new reconciler; fixes env ordering by injecting VIDEO_INPUT_PATH before FILTER_*.
internal/controller/pipelineinstance_controller_batch_multisource.go Adds new multi-source batch reconciler + job builder (N init claimers, direct mode, per-VideoIn VIDEO_INPUT_PATH).
internal/controller/pipelineinstance_controller_batch_multisource_test.go Adds state-machine coverage for multi-source batch reconciler and helper behavior (naming, paths).
internal/controller/pipelineinstance_admission_test.go Adds envtest coverage pinning CRD admission validation (xor rule, listMapKey uniqueness, filterName pattern).
DESIGN_DOCUMENT.md Documents the new multi-source contract (streaming + batch constraints and behavior).
deployment/openfilter-pipelines-controller/crds/filter.plainsight.ai_pipelineinstances.yaml Updates rendered CRD schema/docs: sources, sourceRef description, CEL xor validation.
config/samples/pipelines_v1alpha1_pipelineinstance_stream_multisource.yaml Adds sample for multi-source streaming authoring + constraints.
config/samples/pipelines_v1alpha1_pipelineinstance_batch_multisource.yaml Adds sample for multi-source batch direct-mode behavior + constraints.
config/samples/kustomization.yaml Includes new sample manifests.
config/crd/bases/filter.plainsight.ai_pipelineinstances.yaml Updates CRD base with sources and CEL xor validation.
cmd/claimer/main.go Adds direct-download mode via S3_OBJECT_KEY and relaxes STREAM/GROUP requirements in that mode.
cmd/claimer/main_test.go Adds tests covering direct-mode config loading and validation behavior.
api/v1alpha1/zz_generated.deepcopy.go Updates deep-copies for new types and SourceRef pointer + Sources slice.
api/v1alpha1/pipelineinstance_types.go Adds sources, makes sourceRef optional pointer, adds NamedSourceRef and EffectiveSources(), and updates validation/docs.
Files not reviewed (1)
  • api/v1alpha1/zz_generated.deepcopy.go: Generated file

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread internal/controller/pipelineinstance_controller.go
Comment thread internal/controller/pipelineinstance_controller.go Outdated

@leandrobmarinho leandrobmarinho left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the controller-side multi-source support in depth. The binding->source mapping is correct: streaming splits bindings into a per-filter map + broadcast slice and injects each container's matching RTSP source (env ordered before FILTER_* so dependent-env expands); multi-source batch emits one claimer per binding writing /ws/ and injects VIDEO_INPUT_PATH into only the matching VideoIn; an unmatched filterName is caught up-front as Degraded, not silently dropped. Single-source backward-compat is preserved (sourceRef normalizes to a broadcast binding; CRD XValidation enforces exactly-one-of sourceRef/sources; deepcopy + the config/crd vs Helm CRD copies are in sync). Reconcile is idempotent (Get-then-Create/Patch), conditions use SetStatusCondition so no LastTransitionTime churn, and RBAC already covers pipelinesources. CI green. Looks good.

@shingonoide shingonoide left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed at f54935f. This is a clean, well-tested implementation of the multi-source controller half. The condition lifecycle work stands out: the self-healing Degraded clears, the PipelineSource/Pipeline watch map-functions, and the apply-order race fix (instance applied before its sources, then stale Degraded cleared on recovery) are exactly the kind of integration corners that bite in production, and they are pinned by tests. Status-update conflict handling that requeues with fresh state rather than continuing on a mutated Conditions slice is a nice touch. CI is green.

A few non-blocking notes:

  1. The ticket drafted a FILTER_SOURCES + deterministic-port (5550+index) injection scheme in the controller; the implementation instead injects per-container RTSP_URL (streaming) and VIDEO_INPUT_PATH (batch) and leaves topic/port fan-in to the filter sources: config. The PR body explains and validates this pivot with the 3-camera run, so this is just a request to keep that note discoverable (a one-liner in the design doc noting the controller no longer owns the port scheme would save the next reader the cross-check).

  2. filterName is constrained to DNS-1123 labels (lowercase alphanumeric + hyphen, no underscore). That is the right constraint since the name flows into container/volume/env names, but it is stricter than the original ticket text. Please confirm the deployment-agent (#65) and portal (#429) validate filterName identically so they never emit a name the CRD rejects at admission.

  3. Minor: a single-entry sources[] on a batch instance takes the legacy queue path and the binding's filterName is not used for routing (broadcast-equivalent). This is the intended no-regression behavior for one source and the unmatched-name validation still runs, so no change needed; a one-line comment noting it would help future readers.

Nothing here blocks. Nice work on the operability hardening.

@jmiller-plainsight jmiller-plainsight left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review: Multi-Source Pipeline Instances (PLAT-1141)

Overall, this is an exceptionally high-quality, clean, and thoroughly tested PR. The API design, normalizations, and watchers are beautifully engineered. I have some minor suggestions regarding propagating status update errors on validation failures to ensure the degraded status is always persisted properly, and a small documentation note.

Comment thread internal/controller/pipelineinstance_controller.go
Comment thread internal/controller/pipelineinstance_controller_batch_multisource.go Outdated
Comment thread internal/controller/pipelineinstance_controller_batch_multisource.go Outdated
Comment thread internal/controller/pipelineinstance_controller_batch_multisource.go Outdated
Comment thread internal/controller/pipelineinstance_controller_batch.go
… buckets

New BucketSource.singleObject field: declares the prefix is a FULL
object key (set by the deployment agent for single-file media kinds).
A single named binding whose bucket declares it now routes through the
direct-download path — one init claimer with S3_OBJECT_KEY, no Valkey,
and the extension-preserving /ws/<filterName><ext> input path —
instead of queue mode, whose fixed input.mp4 destination destroys the
file extension and silently breaks extension-sensitive entry filters
(image-in finds zero images and exits; the Job hangs forever on the
waiting downstream). Found empirically testing batch image pipelines.

Backward compatible: bindings without the flag (hand-authored CRs,
fileset prefixes) keep the queue flow; the legacy broadcast sentinel
(singular sourceRef) is never routed direct. CRDs regenerated, Helm
copies synced.
Zero-byte keys ending in "/" (GCS/S3 console "folders") were enqueued
as work items in queue mode; the placeholder then failed its
download/decode attempts, landed in the DLQ, and failed the whole run.
Observed empirically: an external_videos fileset over a console-created
GCS folder processed both real videos (2/3 completions) and then failed
the run on the "videos/" placeholder. Skip them at listing time;
zero-byte regular files and non-empty trailing-slash keys are kept.
…ropagation, cross-ns watch

Review feedback on #74:
- Validation failures (unmatched filterName, multi-source batch invalid
  source / missing object key) now set Progressing=False alongside
  Degraded so the condition set reads consistently for automation;
  Succeeded is left untouched (it records a past terminal outcome).
- Status-update errors on those validation paths are propagated
  (conflict → requeue, else return the error) instead of being logged
  and swallowed — these paths intentionally don't requeue, so a
  swallowed write failure left the server-side status stale forever.
  Batch validation degrade centralized in degradeBatchValidation.
- pipelineInstancesForPipelineSource lists cluster-wide: a
  cross-namespace sourceRef (ref.namespace set) now wakes its instance
  on source events exactly like a same-namespace one; the per-ref
  namespace resolution already handled correctness.
- Steady-state batch reconcile (30s loop) skips the no-op status write
  when the Progressing condition is unchanged.
- Docs: DESIGN_DOCUMENT notes the port/topic ownership pivot
  (authoring-time, not controller-owned); reconcileBatch documents that
  the queue path's filterName is broadcast-equivalent.

@lucasmundim lucasmundim left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the thorough pass — all three notes addressed or answered:

1. Port-scheme ownership note: added to DESIGN_DOCUMENT.md in c7310c9 — a paragraph under the authoring-constraints section records the pivot from the ticket's controller-injected FILTER_SOURCES/port scheme to authoring-time wiring (API export or hand-authored), with the controller owning only per-binding env injection.

2. filterName validation alignment across repos: confirmed identical end-to-end. The API validates instanceName (which becomes filterName) against the same DNS-1123 regex fail-closed at instance create (validation.InstanceNameRegex, plainsight-api#861), the portal enforces it in the editor with the same pattern (INSTANCE_NAME_PATTERN, client-portal#429), and the agent passes through only API-validated names — the CRD pattern is the backstop, and the admission envtest in this PR pins it.

3. Single-entry sources[] comment: added on reconcileBatch in c7310c9 (same as jmiller's note). Also worth flagging since your review snapshot at f54935f: single named bindings with singleObject: true buckets now take the direct path (a5697a5) — the queue path and its broadcast-equivalence remain for everything else.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 23 out of 24 changed files in this pull request and generated 3 comments.

Files not reviewed (1)
  • api/v1alpha1/zz_generated.deepcopy.go: Generated file

Comment thread internal/controller/pipelineinstance_controller_streaming.go
Comment thread DESIGN_DOCUMENT.md Outdated

@shingonoide shingonoide left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the multi-source CRD plus both reconcilers against PLAT-1141. The CEL xor on sourceRef / sources correctly rejects both-set and neither-set (including the empty sources: [] case via the explicit size() > 0), EffectiveSources() normalizes both forms into one path, and the generated deepcopy deep-copies the new slice rather than aliasing it. The per-binding env injection is ordered before FILTER_* on every builder so the $(RTSP_URL) / $(VIDEO_INPUT_PATH) expansion resolves, and the batch direct-download path preserves the file extension (the fix for the image-in case). The PipelineSource to PipelineInstance watch plus the Degraded self-heal closes the PI-before-sources creation race. CI green, including CRD-in-sync and chart parity.

Two non-blocking test-coverage gaps worth a follow-up:

  1. The GPU coexistence with PR #66 is wired on the streaming and batch multi-source paths via the shared applyGPUContainerSharing, but no test asserts nvidia.com/gpu / NVIDIA_VISIBLE_DEVICES placement on a Spec.Sources instance (the 3-cams + 1-GPU-filter topology the ticket calls out).
  2. The streaming RTSP_URL-before-FILTER_* ordering is correct in code but only the batch VIDEO_INPUT_PATH ordering is asserted by a test (assertEnvPrecedesFilterConfig). A streaming-side assertion would guard against future reordering regressions.

Approving. Reminder for the rollout: this needs a v* release (or a pinned CLAIMER_IMAGE) post-merge since claimer:latest predates direct mode, as the ticket notes.

….Object references

Review (Copilot round 2):
- The V1 idle-timeout policy anchored to sourceBindings[0], but
  spec.sources is listType=map — element order is not semantically
  stable across patches. Anchor to the lexicographically-smallest
  filterName instead (idleAnchorBinding, table-tested), and log which
  binding anchored the timeout.
- DESIGN_DOCUMENT.md and a test comment still referenced the removed
  Bucket.Object field — corrected to the prefix-is-full-object-key
  contract.
…ring assertions

Review (shingonoide): two coverage gaps closed —
- TestBuildStreamingDeployment_MultiSource_GPUCoexistence pins the
  3-cams + 1-GPU-filter topology on a Spec.Sources instance: exactly one
  container holds the nvidia.com/gpu limit, the GPU filter carries
  NVIDIA_VISIBLE_DEVICES (and no RTSP_URL), the camera containers carry
  their per-binding RTSP_URL and no GPU env.
- TestBuildStreamingDeployment_RTSPURLPrecedesFilterConfig asserts the
  streaming-side RTSP_URL-before-FILTER_* ordering via the same
  assertEnvPrecedesFilterConfig helper that already guards the batch
  VIDEO_INPUT_PATH ordering.

@lucasmundim lucasmundim left a comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks — both coverage gaps closed in the latest push:

1. GPU coexistence on multi-source: TestBuildStreamingDeployment_MultiSource_GPUCoexistence pins the exact 3-cams + 1-GPU-filter topology on a Spec.Sources instance — exactly one container holds the nvidia.com/gpu limit, the GPU filter carries NVIDIA_VISIBLE_DEVICES (and no RTSP_URL), and each camera container carries its own per-binding RTSP_URL with no GPU env.

2. Streaming env ordering: TestBuildStreamingDeployment_RTSPURLPrecedesFilterConfig asserts RTSP_URL precedes every FILTER_* entry via the same assertEnvPrecedesFilterConfig helper that already guards the batch VIDEO_INPUT_PATH ordering.

And yes on the rollout reminder — the claimer release requirement is recorded in the PR's deploy note, the plan doc's merge-order section, and PLAT-1141's as-built notes, so it can't get lost between merge and deploy.

@lucasmundim lucasmundim merged commit aa65ab5 into main Jun 12, 2026
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants