Skip to content

feat: Unified CMSF/Hang pipeline#1429

Closed
ksletmoe-aws wants to merge 2 commits into
moq-dev:mainfrom
ksletmoe-aws:sletmoe/cmsf-unified-pipeline
Closed

feat: Unified CMSF/Hang pipeline#1429
ksletmoe-aws wants to merge 2 commits into
moq-dev:mainfrom
ksletmoe-aws:sletmoe/cmsf-unified-pipeline

Conversation

@ksletmoe-aws
Copy link
Copy Markdown
Contributor

Unify CMSF support into the existing hang/CMAF pipeline. Supersedes #1408, addressing all review feedback.

Approach

Instead of a parallel CMSF pipeline (~2400 lines duplicating CMAF logic), this extends the existing pipeline with targeted additions (+1658 lines, no duplication). The key insight: CMSF is CMAF with a different catalog format. The relay is format-agnostic, so the difference is purely at the edges.

Changes

MSF catalog support (subscribe side):

  • Add SAP and jitter fields to moq_msf::Track
  • Add MsfConsumer that parses MSF catalogs and converts to hang::Catalog
  • Add --catalog msf flag to moq-cli subscribe
  • Wire SAP type derivation and jitter into to_msf() on publish side

Explicit group mode (multi-encoder redundancy):

  • Add with_explicit_groups() and start_group(sequence) to Fmp4 importer
  • Caller-controlled group boundaries for all tracks simultaneously

Slim C API for CMAF passthrough:

  • moq_track_create with versioned moq_track_config struct
  • Mode B: moq_broadcast_start_group + moq_broadcast_write (aligned groups)
  • Mode A: moq_group_open + moq_group_write + moq_group_close (per-track)
  • moq_track_close with auto-finish of open groups
  • No CMAF muxing in the C API; caller provides pre-built moof+mdat

JS decoder improvement:

  • Video/audio decoders fall back to init segment description when catalog lacks one

How it addresses #1408 feedback

  • "Support both via intermediate representation" → hang::Catalog is the IR, one pipeline, two catalog serializations
  • "Don't copy-paste shared CMAF code" → Zero duplicated code
  • "Need --catalog msf" → Done
  • "C API should be a struct" → moq_track_config with size field for ABI compat
  • "Don't want C API to be a CMAF muxer" → It's not; just track/group lifecycle
  • "Don't want both description and init segment" → Only init_data in the config

Testing

  • 133 tests pass (18 libmoq + 9 moq-msf + 106 moq-mux)
  • JS typecheck and build pass
  • Integration tested: relay + publish + web player via MSF catalog

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 20, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

This PR adds a slim CMAF passthrough publishing path with C FFI bindings, MSF catalog schema and consumer/producer support, and explicit group control for FMP4 importing. JS audio/video decoders now fall back to CMAF init-segment descriptions when catalog descriptions are missing. The Rust publishing layer introduces TrackConfig, SlimTrack state, and methods for Mode A/B group lifecycles; the libmoq FFI exposes track/group create/write/close functions and new group-related error codes. MSF support includes SAP/jitter fields, an MSF consumer converting MSF→hang catalog, producer SAP mapping, and CLI/exporter wiring to select catalog format.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: Unified CMSF/Hang pipeline' accurately summarizes the main objective of extending the existing pipeline to unify CMSF support, which aligns with the primary change scope across multiple modules.
Description check ✅ Passed The description provides a detailed explanation of the approach, changes made (MSF catalog support, explicit group mode, slim C API, JS decoder improvements), how it addresses prior feedback, and testing results—all directly related to the changeset.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
✨ Simplify code
  • Create PR with simplified code

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@js/watch/src/audio/decoder.ts`:
- Around line 275-279: The supported() probe currently ignores the CMAF
description fallback and may reject playable tracks; update the supported()
implementation to mirror the decode fallback: when config.codec !== "opus" use
config.description ? Util.Hex.toBytes(config.description) : init.description (or
its byte form as appropriate) when constructing the probe input so that
init.description is used if catalog config.description is absent, matching the
video decoder's behavior; modify the supported() logic to reference config,
init, and Util.Hex.toBytes accordingly.

In `@js/watch/src/video/decoder.ts`:
- Line 365: supported() currently omits the CMAF init.description fallback used
when calling decoder.configure(), causing valid configs to be rejected; update
supported() to compute description the same way as in configure() by using
this.config.description ? Util.Hex.toBytes(this.config.description) :
init.description (or equivalent) wherever description is evaluated, ensuring the
same fallback logic and referencing the same symbols (supported(), configure(),
this.config.description, init.description) so CMAF video configs with only
init.description are accepted.

In `@rs/moq-mux/src/catalog/msf_consumer.rs`:
- Around line 142-155: The conversion function container_from_msf (and the other
MSF->internal conversion sites noted) is dropping track metadata—description,
jitter, and un-decoded init_data for non-CMAF—causing lost decoder hints; update
container_from_msf and the other conversion locations to preserve
track.description and track.jitter into the corresponding Container variants and
to carry init_data through (decode base64 for CMAF as currently done, but for
Legacy/LOC preserve the raw init_data string or bytes instead of setting None),
i.e., map moq_msf::Track.{description, jitter, init_data} into Container::Legacy
and Container::Cmaf fields (decode when needed) rather than hardcoding
description/jitter to None. Ensure you update all occurrences referenced (the
other conversion branches around the file) to keep consistent field population.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 76bb0e2c-13fe-4d9a-a748-88bdfb9e89a7

📥 Commits

Reviewing files that changed from the base of the PR and between b850c9f and 4e57d6f.

📒 Files selected for processing (15)
  • js/watch/src/audio/decoder.ts
  • js/watch/src/video/decoder.ts
  • rs/libmoq/src/api.rs
  • rs/libmoq/src/error.rs
  • rs/libmoq/src/id.rs
  • rs/libmoq/src/publish.rs
  • rs/libmoq/src/test.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-msf/src/lib.rs
  • rs/moq-mux/src/catalog/mod.rs
  • rs/moq-mux/src/catalog/msf_consumer.rs
  • rs/moq-mux/src/catalog/producer.rs
  • rs/moq-mux/src/export/fmp4.rs
  • rs/moq-mux/src/import/fmp4.rs
  • rs/moq-mux/src/import/test/mod.rs

Comment thread js/watch/src/audio/decoder.ts
Comment thread js/watch/src/video/decoder.ts
Comment thread rs/moq-mux/src/catalog/msf_consumer.rs
@ksletmoe-aws ksletmoe-aws force-pushed the sletmoe/cmsf-unified-pipeline branch 2 times, most recently from 7a69704 to 90255ba Compare May 20, 2026 08:55
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
rs/moq-mux/src/catalog/msf_consumer.rs (1)

142-155: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Preserve Legacy/LOC init_data as decoder description during MSF→hang conversion.

The conversion currently drops non-CMAF decoder config by hardcoding description to None, even though producer-side Legacy/LOC tracks serialize this data into init_data. That breaks metadata roundtrip and can leave downstream decoders without required config bytes.

🔧 Proposed fix
+use bytes::Bytes;
+
+fn decode_legacy_description(track: &moq_msf::Track) -> anyhow::Result<Option<Bytes>> {
+	track
+		.init_data
+		.as_deref()
+		.map(|b64| {
+			base64::engine::general_purpose::STANDARD
+				.decode(b64)
+				.map(Bytes::from)
+				.with_context(|| format!("MSF track {:?} has malformed init_data", track.name))
+		})
+		.transpose()
+}
+
 fn video_config_from_msf(track: &moq_msf::Track) -> anyhow::Result<Option<VideoConfig>> {
 	let Some(container) = container_from_msf(track)? else {
 		return Ok(None);
 	};
+	let description = match track.packaging {
+		moq_msf::Packaging::Loc | moq_msf::Packaging::Legacy => decode_legacy_description(track)?,
+		_ => None,
+	};
 
 	Ok(Some(VideoConfig {
 		codec,
-		description: None,
+		description,
 		...
 	}))
 }
 
 fn audio_config_from_msf(track: &moq_msf::Track) -> anyhow::Result<Option<AudioConfig>> {
 	let Some(container) = container_from_msf(track)? else {
 		return Ok(None);
 	};
+	let description = match track.packaging {
+		moq_msf::Packaging::Loc | moq_msf::Packaging::Legacy => decode_legacy_description(track)?,
+		_ => None,
+	};
 
 	Ok(Some(AudioConfig {
 		codec,
 		...
-		description: None,
+		description,
 		...
 	}))
 }

Also applies to: 176-178, 220-221

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-mux/src/catalog/msf_consumer.rs` around lines 142 - 155, The
conversion drops non‑CMAF decoder config by always setting Legacy/LOC decoder
description to None; update container_from_msf to preserve init_data for
moq_msf::Packaging::Loc and ::Legacy: if track.init_data.is_some() base64-decode
it (with the same with_context-style errors used for CMAF) and pass the decoded
bytes as the decoder description into the Container variant instead of None.
This requires changing the Container::Legacy representation (and all other call
sites that construct Container::Legacy, including the similar constructions
referenced around the other affected match arms at the other locations) to
accept an Option<Vec<u8>> or Vec<u8> description, and update the other
constructors at the noted spots (the ones currently setting description: None)
to provide Some(decoded_bytes) or None when missing, preserving error contexts
for malformed base64.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/libmoq/src/publish.rs`:
- Around line 174-176: The code currently coerces missing audio metadata to zero
(setting sample_rate and channel_count from
config.sample_rate.unwrap_or(0)/config.channel_count.unwrap_or(0)), which
produces invalid renditions; change the logic in publish.rs to explicitly
validate config.sample_rate and config.channel_count after parsing
AudioCodec::from_str(&config.codec) and return an error when either is None
(with a clear message like "missing audio sample_rate" / "missing audio
channel_count") instead of defaulting to 0, then use the unwrapped values (e.g.,
sample_rate = config.sample_rate.unwrap()) for downstream processing so invalid
configs fail fast.
- Around line 49-53: The slim slabs (tracks: NonZeroSlab<SlimTrack> and groups:
NonZeroSlab<moq_lite::GroupProducer>) currently outlive Publish::close and allow
stale handles; modify SlimTrack and/or GroupProducer to record the owning
broadcast/track ID (e.g. store an owner key in SlimTrack and in GroupProducer
point to its parent track), update Publish::track_create and Publish::group_open
to populate that ownership, and on Publish::close and moq_track_close (and
moq_publish_close) iterate and remove/drain any slab entries whose owner matches
the closing broadcast/track so groups are implicitly finished and track entries
are reclaimed; ensure the existing close paths call the slab removal logic so no
slim handles remain valid after close.
- Around line 211-219: broadcast_start_group currently mutates tracks one-by-one
which can leave state inconsistent on partial failure; change it to a two-phase
(prepare/commit) or transactional approach: first collect the matching tracks
from self.tracks, then for each prepare by calling
track.producer.create_group(...) and storing the new Group and the previous
group value in temporaries (but don't assign into track.group yet), and also
call prev.finish() into temporaries if needed; if all preparations succeed,
commit by assigning the new Group into each track.group; if any preparation
fails, roll back by finishing any created groups and restoring any previous
state; reference track.producer.create_group, track.group, and prev.finish in
your changes.

---

Duplicate comments:
In `@rs/moq-mux/src/catalog/msf_consumer.rs`:
- Around line 142-155: The conversion drops non‑CMAF decoder config by always
setting Legacy/LOC decoder description to None; update container_from_msf to
preserve init_data for moq_msf::Packaging::Loc and ::Legacy: if
track.init_data.is_some() base64-decode it (with the same with_context-style
errors used for CMAF) and pass the decoded bytes as the decoder description into
the Container variant instead of None. This requires changing the
Container::Legacy representation (and all other call sites that construct
Container::Legacy, including the similar constructions referenced around the
other affected match arms at the other locations) to accept an Option<Vec<u8>>
or Vec<u8> description, and update the other constructors at the noted spots
(the ones currently setting description: None) to provide Some(decoded_bytes) or
None when missing, preserving error contexts for malformed base64.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2815b799-c07b-42fb-9b9c-3d8b4f7ef339

📥 Commits

Reviewing files that changed from the base of the PR and between 4e57d6f and c094d8e.

📒 Files selected for processing (15)
  • js/watch/src/audio/decoder.ts
  • js/watch/src/video/decoder.ts
  • rs/libmoq/src/api.rs
  • rs/libmoq/src/error.rs
  • rs/libmoq/src/id.rs
  • rs/libmoq/src/publish.rs
  • rs/libmoq/src/test.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-msf/src/lib.rs
  • rs/moq-mux/src/catalog/mod.rs
  • rs/moq-mux/src/catalog/msf_consumer.rs
  • rs/moq-mux/src/catalog/producer.rs
  • rs/moq-mux/src/export/fmp4.rs
  • rs/moq-mux/src/import/fmp4.rs
  • rs/moq-mux/src/import/test/mod.rs

Comment thread rs/libmoq/src/publish.rs Outdated
Comment thread rs/libmoq/src/publish.rs Outdated
Comment thread rs/libmoq/src/publish.rs
@ksletmoe-aws ksletmoe-aws force-pushed the sletmoe/cmsf-unified-pipeline branch 2 times, most recently from 6249d3e to 152d470 Compare May 20, 2026 10:30
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
rs/moq-mux/src/catalog/msf_consumer.rs (1)

142-155: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Preserve legacy/LOC init_data as decoder description during MSF→hang conversion.

Current conversion drops non-CMAF initialization bytes by hardcoding description: None, even though producer-side conversion writes legacy descriptions into init_data. That makes the roundtrip lossy and can break decoder setup for legacy tracks.

🔧 Suggested fix
+fn decode_legacy_description(track: &moq_msf::Track) -> anyhow::Result<Option<bytes::Bytes>> {
+	match track.packaging {
+		moq_msf::Packaging::Loc | moq_msf::Packaging::Legacy => track
+			.init_data
+			.as_ref()
+			.map(|b64| {
+				base64::engine::general_purpose::STANDARD
+					.decode(b64)
+					.map(Into::into)
+					.with_context(|| format!("MSF track {:?} has malformed init_data", track.name))
+			})
+			.transpose(),
+		_ => Ok(None),
+	}
+}
+
 fn video_config_from_msf(track: &moq_msf::Track) -> anyhow::Result<Option<VideoConfig>> {
 	let Some(container) = container_from_msf(track)? else {
 		return Ok(None);
 	};
@@
 	Ok(Some(VideoConfig {
 		codec,
-		description: None,
+		description: decode_legacy_description(track)?,
@@
 fn audio_config_from_msf(track: &moq_msf::Track) -> anyhow::Result<Option<AudioConfig>> {
 	let Some(container) = container_from_msf(track)? else {
 		return Ok(None);
 	};
@@
 	Ok(Some(AudioConfig {
@@
-		description: None,
+		description: decode_legacy_description(track)?,

Also applies to: 176-179, 224-225

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-mux/src/catalog/msf_consumer.rs` around lines 142 - 155, The
conversion currently discards legacy/LOC init_data by returning
Container::Legacy with description None; update container_from_msf (and the
analogous conversion spots noted) to preserve non-CMAF init_data by reading
track.init_data (base64-decode or keep raw as appropriate) and set it as the
decoder description field on the returned Container/decoder structure instead of
hardcoding None; ensure you reuse the same base64 decoding/with_context pattern
used for CMAF (track.init_data.as_ref() -> decode) and attach the decoded bytes
to Container::Legacy/decoder description so roundtrips retain legacy init bytes.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@rs/moq-mux/src/catalog/msf_consumer.rs`:
- Around line 142-155: The conversion currently discards legacy/LOC init_data by
returning Container::Legacy with description None; update container_from_msf
(and the analogous conversion spots noted) to preserve non-CMAF init_data by
reading track.init_data (base64-decode or keep raw as appropriate) and set it as
the decoder description field on the returned Container/decoder structure
instead of hardcoding None; ensure you reuse the same base64
decoding/with_context pattern used for CMAF (track.init_data.as_ref() -> decode)
and attach the decoded bytes to Container::Legacy/decoder description so
roundtrips retain legacy init bytes.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bfccbe2e-33d3-4bff-b5c5-4e7be4a737b7

📥 Commits

Reviewing files that changed from the base of the PR and between 6249d3e and 152d470.

📒 Files selected for processing (15)
  • js/watch/src/audio/decoder.ts
  • js/watch/src/video/decoder.ts
  • rs/libmoq/src/api.rs
  • rs/libmoq/src/error.rs
  • rs/libmoq/src/id.rs
  • rs/libmoq/src/publish.rs
  • rs/libmoq/src/test.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-msf/src/lib.rs
  • rs/moq-mux/src/catalog/mod.rs
  • rs/moq-mux/src/catalog/msf_consumer.rs
  • rs/moq-mux/src/catalog/producer.rs
  • rs/moq-mux/src/export/fmp4.rs
  • rs/moq-mux/src/import/fmp4.rs
  • rs/moq-mux/src/import/test/mod.rs

@ksletmoe-aws ksletmoe-aws force-pushed the sletmoe/cmsf-unified-pipeline branch from 152d470 to de139f1 Compare May 20, 2026 15:38
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
rs/libmoq/src/publish.rs (1)

258-279: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

broadcast_start_group() still leaves the broadcast half-advanced on error.

Lines 268-276 mutate tracks one by one. If finishing or creating a group fails on a later track, earlier tracks have already rolled over and the broadcast no longer has a single aligned sequence. The doc comment acknowledges that partial state, but this API is supposed to provide broadcast-aligned groups; it should stage/rollback instead of leaving live state inconsistent.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/libmoq/src/publish.rs` around lines 258 - 279, broadcast_start_group
currently mutates each track as it goes (calling prev.finish() and
track.producer.create_group) so a later failure leaves earlier tracks advanced;
instead stage the work: first iterate tracks for the given broadcast_id and call
track.producer.create_group(sequence) but store results in a temporary map keyed
by track id (do not mutate track.group or call prev.finish yet), if any
create_group fails then finish() any newly created groups in the temp map and
return the error, otherwise loop tracks again and for each track call
prev.finish() (if present) and then set track.group = Some(temp_group) so the
swap is atomic from the broadcast's perspective; reference functions/fields:
broadcast_start_group, track.group, prev.finish(), track.producer.create_group,
and self.tracks.
🧹 Nitpick comments (1)
rs/moq-mux/src/import/test/mod.rs (1)

223-227: ⚡ Quick win

Wrap unbounded awaits with tokio::time::timeout to improve test reliability and prevent CI hangs.

These E2E tests use unbounded awaits on msf.next(), next_group(), and read_frame(). Adding timeouts prevents indefinite blocking on transport stalls or regressions.

Proposed test hardening
+use std::time::Duration;
+
@@
-	let catalog = msf
-		.next()
-		.await
+	let catalog = tokio::time::timeout(Duration::from_secs(2), msf.next())
+		.await
+		.expect("timed out waiting for MSF catalog")
 		.expect("MSF catalog should decode")
 		.expect("MSF catalog should be present");
@@
-	let mut group = video
-		.next_group()
-		.await
+	let mut group = tokio::time::timeout(Duration::from_secs(2), video.next_group())
+		.await
+		.expect("timed out waiting for next_group")
 		.expect("next_group should not error")
 		.expect("a group should be available");
@@
-	let frame = group
-		.read_frame()
-		.await
+	let frame = tokio::time::timeout(Duration::from_secs(2), group.read_frame())
+		.await
+		.expect("timed out waiting for frame")
 		.expect("read_frame should not error")
 		.expect("group should contain at least one fragment");
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@rs/moq-mux/src/import/test/mod.rs` around lines 223 - 227, The test currently
does unbounded awaits (e.g., msf.next(), next_group(), read_frame()) which can
hang CI; wrap each of these awaits with tokio::time::timeout using a sensible
Duration (e.g., a few seconds) and import tokio::time::timeout and
std::time::Duration, then assert on the Result to fail the test on timeout
(e.g., .expect("timed out waiting for msf.next()") or map the timeout error to a
test failure). Apply this to the catalog assignment using msf.next() and any
other occurrences of next_group() and read_frame() so the test fails fast
instead of blocking indefinitely.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rs/libmoq/src/publish.rs`:
- Around line 31-39: SlimTrack currently lacks a field to record whether a track
is using Mode A or Mode B, so Publish::broadcast_start_group and
Publish::group_open can create groups for the wrong mode; add an explicit
per-track enum field (e.g. mode: Option<Mode> where Mode = ModeA | ModeB) to
SlimTrack next to producer and group, set it when creating a Mode A or Mode B
group, and in Publish::broadcast_start_group and Publish::group_open check that
the existing mode either matches the requested mode or is None and reject
(return an error) immediately if the modes conflict before creating any
GroupProducer or calling TrackProducer APIs so a TrackProducer never receives
overlapping cross-mode lifecycles.
- Around line 196-247: The code currently calls broadcast.create_track(...)
before validating codec/config which can leave an orphan producer if
VideoCodec::from_str or AudioCodec::from_str fails; change the flow in
publish.rs so you first parse/construct the full VideoConfig or AudioConfig
(using VideoCodec::from_str and AudioCodec::from_str, validate
sample_rate/channel_count for audio, and determine container) and insert the
rendition into catalog while holding the catalog lock, and only after all
fallible validations succeed call broadcast.create_track(...), then insert the
resulting SlimTrack into self.tracks; ensure the order of operations around
broadcast.create_track, catalog.lock(), and self.tracks.insert matches this
sequence to avoid creating a producer before validation completes.

---

Duplicate comments:
In `@rs/libmoq/src/publish.rs`:
- Around line 258-279: broadcast_start_group currently mutates each track as it
goes (calling prev.finish() and track.producer.create_group) so a later failure
leaves earlier tracks advanced; instead stage the work: first iterate tracks for
the given broadcast_id and call track.producer.create_group(sequence) but store
results in a temporary map keyed by track id (do not mutate track.group or call
prev.finish yet), if any create_group fails then finish() any newly created
groups in the temp map and return the error, otherwise loop tracks again and for
each track call prev.finish() (if present) and then set track.group =
Some(temp_group) so the swap is atomic from the broadcast's perspective;
reference functions/fields: broadcast_start_group, track.group, prev.finish(),
track.producer.create_group, and self.tracks.

---

Nitpick comments:
In `@rs/moq-mux/src/import/test/mod.rs`:
- Around line 223-227: The test currently does unbounded awaits (e.g.,
msf.next(), next_group(), read_frame()) which can hang CI; wrap each of these
awaits with tokio::time::timeout using a sensible Duration (e.g., a few seconds)
and import tokio::time::timeout and std::time::Duration, then assert on the
Result to fail the test on timeout (e.g., .expect("timed out waiting for
msf.next()") or map the timeout error to a test failure). Apply this to the
catalog assignment using msf.next() and any other occurrences of next_group()
and read_frame() so the test fails fast instead of blocking indefinitely.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d9faf3e0-ab14-4ff5-91fe-9240122bbd87

📥 Commits

Reviewing files that changed from the base of the PR and between 152d470 and de139f1.

📒 Files selected for processing (15)
  • js/watch/src/audio/decoder.ts
  • js/watch/src/video/decoder.ts
  • rs/libmoq/src/api.rs
  • rs/libmoq/src/error.rs
  • rs/libmoq/src/id.rs
  • rs/libmoq/src/publish.rs
  • rs/libmoq/src/test.rs
  • rs/moq-cli/src/subscribe.rs
  • rs/moq-msf/src/lib.rs
  • rs/moq-mux/src/catalog/mod.rs
  • rs/moq-mux/src/catalog/msf_consumer.rs
  • rs/moq-mux/src/catalog/producer.rs
  • rs/moq-mux/src/export/fmp4.rs
  • rs/moq-mux/src/import/fmp4.rs
  • rs/moq-mux/src/import/test/mod.rs

Comment thread rs/libmoq/src/publish.rs
Comment thread rs/libmoq/src/publish.rs
Unify CMSF support into the existing hang/CMAF pipeline, replacing the
parallel implementation approach from PR moq-dev#1408 with targeted extensions.

MSF catalog support (subscribe side):
- Add SAP and jitter fields to moq_msf::Track
- Add MsfConsumer that parses MSF catalogs and converts to hang::Catalog
- Add --catalog msf flag to moq-cli subscribe
- Wire SAP type derivation and jitter into to_msf() on publish side

Explicit group mode (multi-encoder redundancy):
- Add with_explicit_groups() and start_group(sequence) to Fmp4 importer
- Caller-controlled group boundaries for all tracks simultaneously

Slim C API for CMAF passthrough:
- moq_track_create with versioned moq_track_config struct
- Mode B: moq_broadcast_start_group + moq_broadcast_write (aligned groups)
- Mode A: moq_group_open + moq_group_write + moq_group_close (per-track)
- moq_track_close with auto-finish of open groups
- Orphaned Mode A groups auto-reaped on track/broadcast close
- No CMAF muxing in the C API; caller provides pre-built moof+mdat

JS decoder improvement:
- Video/audio decoders fall back to init segment description when catalog
  lacks one, enabling playback of CMSF streams without explicit description
- supported() probes also use init segment fallback with try-catch safety

Supersedes PR moq-dev#1408.
@ksletmoe-aws ksletmoe-aws force-pushed the sletmoe/cmsf-unified-pipeline branch from de139f1 to 5cba5c0 Compare May 20, 2026 16:26
Copy link
Copy Markdown
Collaborator

@kixelated kixelated left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MUCH better than the previous PR.

A few API nits. The libmoq API is the most concerning to me. I want to keep it simple, and I think we can do that with the existing write API + an optional start_group call.

try {
description = Container.Cmaf.decodeInitSegment(base64ToBytes(config.container.init)).description;
} catch {
// Malformed init segment; let isConfigSupported decide without description.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should warn and return return false.

let mut fmp4 = moq_mux::export::Fmp4::new(self.broadcast)?.with_latency(self.args.max_latency);
let mut fmp4 = match self.args.catalog {
CatalogFormat::Hang => moq_mux::export::Fmp4::new(self.broadcast)?,
CatalogFormat::Msf => moq_mux::export::Fmp4::new_msf(self.broadcast)?,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the second argument should be the CatalogFormat instead.

fn video_sap_type(codec: &hang::catalog::VideoCodec) -> Option<u8> {
use hang::catalog::VideoCodec;
match codec {
VideoCodec::H264(_) | VideoCodec::VP8 | VideoCodec::AV1(_) | VideoCodec::VP9(_) => Some(1),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be 2 for b-frames.

use hang::catalog::VideoCodec;
match codec {
VideoCodec::H264(_) | VideoCodec::VP8 | VideoCodec::AV1(_) | VideoCodec::VP9(_) => Some(1),
VideoCodec::H265(_) => Some(3),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't support this at all right now... I'd rather not advertise it until then.

/// Audio fragments accumulate in the current group until `start_group` is called.
/// This means late-joining subscribers must buffer the entire group before playback.
/// For latency-sensitive non-redundancy use cases, use the default auto mode instead.
pub fn with_explicit_groups(mut self) -> Self {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we need this option. I'm fine with start_group, and we should only close the existing group if there's at least 1 frame (on keyframe).

const initSegment = base64ToBytes(this.config.container.init);
const init = Container.Cmaf.decodeInitSegment(initSegment);
const description = this.config.description ? Util.Hex.toBytes(this.config.description) : undefined;
const description = this.config.description ? Util.Hex.toBytes(this.config.description) : init.description;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: I want to get rid of this.config.description and have each container handle description.

What you have is correct.

Comment thread rs/libmoq/src/api.rs
Comment on lines +697 to +698
/// codec strings that are not recognized, the implementation falls back to
/// inspecting the `width`/`height` and `sample_rate`/`channel_count` fields
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to make this stuff automatic if possible. (almost) all of this information should be in CMAF already, so it should get parsed out via the CMAF import. We can only shoot ourselves in the foot if we allow users to contradict the moov.

Comment thread rs/libmoq/src/api.rs
///
/// # Group lifecycle
/// Each track supports one of two mutually exclusive modes:
/// - **Mode A** (per track groups): groups are managed via [`moq_group_open`]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah see my later comment, I'm not sure we need to make this a hard switch. start_group could be optional.

@ksletmoe-aws
Copy link
Copy Markdown
Contributor Author

Cool, will take a look at the comments and get a new revision up. Thanks!

@kixelated
Copy link
Copy Markdown
Collaborator

Cool, will take a look at the comments and get a new revision up. Thanks!

I can make the API changes too, just lemme know.

kixelated added a commit that referenced this pull request May 22, 2026
Co-authored-by: sletmoe <sletmoe@amazon.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@moq-bot moq-bot Bot mentioned this pull request May 22, 2026
@kixelated
Copy link
Copy Markdown
Collaborator

I merged most of this via #1444

I'm not sure about adding an advanced splitting API to libmoq though. @ksletmoe-aws we should chat about your use-case and how we could address it.

@kixelated kixelated closed this May 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants