Add per-track timescale and frame timestamps to moq-lite#1439
Add per-track timescale and frame timestamps to moq-lite#1439kixelated wants to merge 7 commits into
Conversation
Replace the const-generic Timescale<SCALE> with a single Timestamp type that carries scale as a runtime field, so timescales can be negotiated per-track. Adds sentinel ZERO/MAX (scale 0) plus checked_add_delta and checked_delta_from helpers for the upcoming moq-lite zigzag delta wire format. Updates hang, moq-mux, libmoq, moq-ffi to use the new constructors. Catalog jitter switches from moq_net::Time to Option<moq_net::Timestamp>. This is the foundational refactor; wire-format changes and async subscribe come in follow-up commits. https://claude.ai/code/session_01Wy7egQjXvfPCKovs3mWANB
Each Track now carries a timescale (units per second) and each Frame carries a presentation Timestamp in that timescale. Both default to the unspecified sentinel (scale 0) when not negotiated. Adds VarInt::from_zigzag / to_zigzag for the upcoming moq-lite delta-encoded timestamp wire format. Inputs must fit in i61 (the 62-bit varint range after zigzag), which is ample for frame-to-frame deltas. Updates internal struct-literal call sites across hang, moq-mux, libmoq, moq-ffi, moq-relay, moq-clock to include the new fields with sensible defaults (0 = unspecified). No wire-format changes yet. https://claude.ai/code/session_01Wy7egQjXvfPCKovs3mWANB
Adds moq-lite version Lite05 (ALPN moq-lite-05, code 0xff0dad05): - SUBSCRIBE_OK carries the per-track timescale (varint) so the subscriber learns it at session setup. Older versions decode as 0 (unspecified). - Per-frame wire header gains a zigzag-encoded signed delta from the previous frame's timestamp on the same group stream, before the frame size. Negative deltas support B-frames. - The publisher verifies frame timestamps match the track's negotiated timescale and aborts the connection with ProtocolViolation otherwise. TrackProducer gains a set_timescale method that updates both the local Track info and a new conducer State.timescale, plumbing the value through to consumers. TrackConsumer exposes timescale_now/poll_timescale/timescale to read the resolved value. hang and moq-mux are updated to stamp the moq-net Frame.timestamp on every frame they emit and to set Track.timescale = 1_000_000 (microseconds) on tracks they produce. The legacy container-level timestamp prefix remains as a duplicate during the transition. https://claude.ai/code/session_01Wy7egQjXvfPCKovs3mWANB
BroadcastConsumer::subscribe_track is now async and resolves after the session layer publishes the negotiated timescale (via SUBSCRIBE_OK on moq-lite Lite05+, or 0 for older versions and for IETF until LOC track properties land). Subscribers waiting on the resolved timescale via TrackConsumer::timescale see it before the future completes. subscribe_track_immediate keeps the synchronous semantics for callers inside non-async contexts (FFI, internal moq-net publishers serving a local broadcast) where the timescale either isn't relevant or is already known. The IETF subscriber now also sets timescale = 0 explicitly when SUBSCRIBE_OK arrives, so the async subscribe future resolves on older drafts that don't yet carry LOC track properties. https://claude.ai/code/session_01Wy7egQjXvfPCKovs3mWANB
Follow-up to making BroadcastConsumer::subscribe_track async. https://claude.ai/code/session_01Wy7egQjXvfPCKovs3mWANB
| let video_track = moq_net::Track { | ||
| name: "video".to_string(), | ||
| priority: 1, // Video typically has lower priority than audio | ||
| timescale: 1_000_000, |
There was a problem hiding this comment.
Can you make a Timescale type? Timescale::MICRO is a lot nicer.
| moq_net::Track { | ||
| name: Catalog::DEFAULT_NAME.to_string(), | ||
| priority: 100, | ||
| timescale: 0, |
There was a problem hiding this comment.
Timescale::UNKNOWN
| pub fn encode(&self, group: &mut moq_net::GroupProducer) -> Result<(), Error> { | ||
| let mut header = BytesMut::new(); | ||
| self.timestamp.encode(&mut header).map_err(moq_net::Error::from)?; | ||
| self.timestamp.encode_value(&mut header).map_err(moq_net::Error::from)?; |
There was a problem hiding this comment.
I'd rather do something like self.timestamp.value.encode or something
| let size = header.len() + self.payload.len(); | ||
|
|
||
| let mut chunked = group.create_frame(size.into())?; | ||
| let net_frame = moq_net::Frame::new(size as u64).with_timestamp(self.timestamp); |
There was a problem hiding this comment.
I think you remove Frame::new and require using the struct constructor instead. timestamp shouldn't be optional, builder doesn't make sense.
| /// Decode a frame from raw bytes (VarInt timestamp prefix + payload). | ||
| pub fn decode(mut buf: impl Buf) -> Result<Self, Error> { | ||
| let timestamp = Timestamp::decode(&mut buf)?; | ||
| let timestamp = Timestamp::decode_value(&mut buf, TIMESCALE)?; |
There was a problem hiding this comment.
I think we decode the value separately (via u64::decode) then call Timestamp::new
There was a problem hiding this comment.
Or in this case, Timestamp::from_micros(value)
| let track = self.inner.subscribe_track_immediate(&moq_net::Track { | ||
| name, | ||
| priority: 0, | ||
| timescale: 0, |
There was a problem hiding this comment.
We need to change subscribe_track so it only takes a &str name instead of a Track. The timescale and priority arrives in the SUBSCRIBE_OK instead. To allow subscriber priorities, we should add a Delivery struct in the future for overrides, but we need to start splitting track properties (set by the publisher) and track requests (set by the subscriber)
| let cts = entry.cts.unwrap_or_default() as i64; | ||
| let pts = dts.checked_add_signed(cts).ok_or(Error::PtsOverflow)?; | ||
| let timestamp = Timestamp::from_scale(pts, timescale)?; | ||
| let timestamp = Timestamp::from_scale(pts, timescale, crate::container::TIMESCALE)?; |
There was a problem hiding this comment.
Kinda confusing. Maybe make a .convert method or something to change timescale units? Timestamp::new(pts, timescale).convert(crate::container::TIMESCALE)
| /// as the catalog changes. | ||
| pub fn new(broadcast: moq_net::BroadcastConsumer) -> Result<Self, crate::Error> { | ||
| let catalog_track = broadcast.subscribe_track(&hang::Catalog::default_track())?; | ||
| let catalog_track = broadcast.subscribe_track_immediate(&hang::Catalog::default_track())?; |
There was a problem hiding this comment.
Cant do this in the constructor any longer. please stop using subscribe_track_immediate, we need to fix the API.
| config: AacConfig, | ||
| ) -> anyhow::Result<Self> { | ||
| let track = broadcast.unique_track(".aac")?; | ||
| let mut track = broadcast.unique_track(".aac")?; |
There was a problem hiding this comment.
Need to improve this API. What about the priority too? unique_track should probably return a String or something?
I really don't like .set_timescale. Publisher track properties like the name/priority/timescale should be fixed. Allowing them to be updated can only cause pain.
|
|
||
| /// Wait until the track's timescale has been negotiated (e.g. via SUBSCRIBE_OK | ||
| /// for subscribers, or set immediately by publishers). | ||
| pub async fn timescale(&self) -> Result<u64> { |
There was a problem hiding this comment.
Please no. Return a TrackConsumer only when we get the SUBSCRIBE_OK.
…cale type Addresses PR #1439 review: - Add `Timescale` newtype with named constants (UNKNOWN/SECOND/MILLI/MICRO/NANO); replace raw `u64` timescale fields and method args throughout - `BroadcastConsumer::subscribe_track(name, Subscription)` is now `async` and resolves on SUBSCRIBE_OK with the publisher's authoritative Track properties (priority, timescale). `subscribe_track_immediate` is gone - Add `Subscription` for subscriber-side preferences (priority, timeout), separated from the publisher's immutable Track. Add aggregation methods `TrackProducer::max_priority` / `max_timeout` and an updatable `TrackConsumer::update_subscription`. This is the API surface the future fetch path will plug into - Make Track properties immutable on `TrackProducer`: remove `set_timescale`, fold `unique_track` into `unique_name` so callers construct the full Track themselves - Introduce `TrackRequest` so the dynamic broadcast flow yields a request the publisher fulfills via `accept(Track)`, completing the async resolution - Remove `Frame::new` and `with_timestamp`; require struct construction; keep size-only `From` impls - Hang container frame now uses `VarInt::encode_quic`/`decode_quic` + the `Timestamp::value()` accessor for explicit timestamp varint coding - Replace `Timestamp::from_scale` callsites with `Timestamp::new(...).convert(...)` Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Brings in the stats, LOC, MKV, MSF, fragment-duration, and async-CMSF
changes from main alongside the async-subscribe / immutable-track work on
this branch.
Conflict resolution highlights:
- lite::publisher / lite::subscriber: keep the TrackRequest-based async
subscribe flow on the subscriber side and threading the publisher's
Track properties through SUBSCRIBE_OK, plus the new StatsHandle
bookkeeping from main.
- moq-mux::export::Mkv / Fmp4: rewrite per-track subscribe to drive
in-flight subscribe_track futures via FuturesUnordered polled with the
conducer waiter's Waker, so a paused-time runtime still completes them
before time advances. Defer header emission until every catalog-listed
track is resolved.
- moq-mux::import::mkv: switch unique_track to unique_name +
create_track(Track {..}) with explicit timescale.
- catalog::msf_consumer / producer: moq_net::Time → moq_net::Timestamp,
as_millis() now returns Result so jitter conversion gets and_then'd.
- container::loc: Timestamp::from_scale(value, src) → Timestamp::new +
convert; create_frame takes Into<Frame> so the usize size arg works.
- hang::container::Frame::encode: normalize timestamps to the container
TIMESCALE on the wire so producers using a different source scale
(e.g. nanoseconds from MKV) round-trip correctly via Timestamp::from_micros.
Also exposes Waiter::waker() for downstream code that needs a Waker to
drive sub-futures inside a conducer poll loop.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This change introduces per-track timescale negotiation and per-frame timestamps to the moq-lite protocol, enabling accurate media synchronization across different frame rates and sample rates.
Summary
Refactors the timestamp system from a generic
Timescale<const SCALE: u64>type alias to a concreteTimestampstruct that carries both a raw value and its scale. This allows timestamps to be expressed in any unit (seconds, milliseconds, microseconds, nanoseconds) and enables the protocol to negotiate per-track timescales via SUBSCRIBE_OK messages.Key Changes
Timestamp struct redesign: Replaced
Timescale<SCALE>type alias with a concreteTimestampstruct containingvalue: VarIntandscale: u64. The scale is now carried per-timestamp instance rather than as a compile-time constant.Unspecified scale handling:
scale == 0denotes an unspecified timescale (used byTimestamp::ZEROandTimestamp::MAXsentinels). Unit conversions and arithmetic against unspecified scales returnTimeOverflowto prevent divide-by-zero errors.Scale conversion API: Added
Timestamp::convert()andTimestamp::as_scale()methods for explicit scale conversions. Allas_*methods (as_secs, as_millis, etc.) now returnResult<T, TimeOverflow>instead of infallible values.Signed delta encoding: Added
Timestamp::checked_add_delta()andTimestamp::checked_delta_from()for zigzag-encoded signed deltas used by moq-lite per-frame delta decoding (for B-frames).Protocol integration:
timescale: u64field toSubscribeOkmessage (Lite05+)timescale: u64field toTrackstructtimestamp: Timestampfield toFramestructVersion::Lite05to indicate timestamp support on the wireVarInt zigzag support: Added
VarInt::from_zigzag()andVarInt::to_zigzag()for signed integer encoding used in delta timestamps.Ordering and comparison: Implemented
OrdandPartialOrdforTimestampwith debug assertions for scale compatibility. Cross-scale comparisons are allowed only when one side is a scale-0 sentinel.Removed PartialOrd/Ord from Timescale: The old generic type no longer derives these traits; the new
Timestampimplements them with scale-aware semantics.Implementation Details
Timestamp::now()returns microsecond-scale timestamps anchored to a jittered wall-clock reference.checked_add,checked_sub) require matching scales and returnTimeOverflowon mismatch.Debugimpl chooses the largest unit (seconds, milliseconds, microseconds, nanoseconds) where the value has no fractional part.Compatibility
scale == 0(unspecified)https://claude.ai/code/session_01Wy7egQjXvfPCKovs3mWANB