Skip to content

feat(lite): implement SUBSCRIBE_UPDATE API for JS subscriber and publisher#1396

Open
metapox wants to merge 3 commits into
moq-dev:mainfrom
metapox:feat/subscribe-update-js
Open

feat(lite): implement SUBSCRIBE_UPDATE API for JS subscriber and publisher#1396
metapox wants to merge 3 commits into
moq-dev:mainfrom
metapox:feat/subscribe-update-js

Conversation

@metapox
Copy link
Copy Markdown

@metapox metapox commented May 10, 2026

Closes #1363

Problem

The JS subscriber has no API to change priority after subscribing. The only option is to close and re-subscribe, which causes the relay to replay cached groups.

Changes

  • track.ts: Add updatePriority(priority) method that updates an internal Signal
  • subscriber.ts: After SubscribeOk, watch the priority Signal and send SubscribeUpdate when it changes
  • publisher.ts: Apply received SubscribeUpdate priority to the track (implements existing TODO)

Usage

const track = broadcast.subscribe("video", 200);
// Later, change priority without re-subscribing:
track.updatePriority(0);

Context

I'm building moq-multicam, a multi-camera streaming system. Camera switching needs instant priority changes without re-subscribe to avoid replaying stale groups from the relay cache.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 10, 2026

Review Change Stack

Walkthrough

This PR implements live track priority updates by adding a bidirectional subscriber-publisher protocol. The Signal class gains a next() utility method that returns a Promise resolving on signal changes. TrackState adds a priority signal field, and Track exposes an updatePriority(priority: number) method. The Subscriber imports SubscribeUpdate and spawns a priorityWatch loop that observes the track's priority signal and sends SubscribeUpdate frames on the subscription stream. The Publisher receives these SubscribeUpdate messages, logs them, and applies the updated priority to the track via updatePriority().

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically describes the main change: implementing SUBSCRIBE_UPDATE API for JS subscriber and publisher, which matches the changeset perfectly.
Description check ✅ Passed The description is directly related to the changeset, explaining the problem, changes made to each file, and a usage example that aligns with the code modifications.
Linked Issues check ✅ Passed The PR fully implements all coding objectives from issue #1363: adds priority Signal and updatePriority() method to Track [#1363], implements priority watching and SUBSCRIBE_UPDATE sending in Subscriber [#1363], and applies received SubscribeUpdate priority in Publisher [#1363].
Out of Scope Changes check ✅ Passed The Signal.next() method added to @moq/signals is a supporting change that enables cleaner implementation of the priority-watching pattern, directly serving the SUBSCRIBE_UPDATE feature.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
✨ Simplify code
  • Create PR with simplified code

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ESLint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

ESLint skipped: no ESLint configuration detected in root package.json. To enable, add eslint to devDependencies.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@js/lite/src/lite/subscriber.ts`:
- Around line 193-209: The priorityWatch loop in the priority watcher (the async
IIFE named priorityWatch) must be rewritten to avoid listener leaks and missed
updates: before attaching a listener via
request.track.state.priority.changed(resolve) first check the current priority
snapshot (e.g., read the priority getter/value) so an update that happened
already is handled immediately; if you attach a listener ensure its
dispose/unsubscribe is always called in every path (use try/finally around the
awaited Promise) so listeners are not accumulated; and make the loop
cancellable/terminating when stream.reader.closed or request.track.closed
resolves (for example pass an AbortSignal tied to those closures or break the
loop when either is closed) so the pending listener is cleaned up on shutdown.
Apply changes inside the priorityWatch IIFE referencing
request.track.state.priority.changed, updatePriority handling, SubscribeUpdate
encoding, and the Promise.race with stream.reader.closed and
request.track.closed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 14817bcb-af2f-4924-8e95-029b6bca2793

📥 Commits

Reviewing files that changed from the base of the PR and between 6adf3c3 and 8f672bf.

📒 Files selected for processing (3)
  • js/lite/src/lite/publisher.ts
  • js/lite/src/lite/subscriber.ts
  • js/lite/src/track.ts

Comment on lines +193 to +209
const priorityWatch = (async () => {
for (;;) {
let dispose!: () => void;
const priority = await new Promise<number | undefined>((resolve) => {
dispose = request.track.state.priority.changed(resolve);
});
if (priority === undefined) {
dispose();
continue;
}
const update = new SubscribeUpdate({ priority });
await update.encode(stream.writer, this.version);
console.debug(`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`);
}
})();

await Promise.race([stream.reader.closed, request.track.closed, priorityWatch]);
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot May 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In the latest @moq/signals API, does Signal.changed(callback) remain subscribed until the returned dispose function is called?

💡 Result:

Yes. In the latest @moq/signals API, calling Signal.changed(callback) returns a “dispose” function, and the subscription stays active until that dispose function is called. This is consistent with the library’s standard subscribe pattern: adapters/hooks in the repo use a subscription callback that returns an unsubscribe/dispose function to React’s useSyncExternalStore subscription mechanism (i.e., subscription lifetime is tied to the returned disposer) [1]. The docs site for @moq/signals also emphasizes the explicit subscription mechanism used by effects, rather than implicit global listeners, aligning with disposer-controlled teardown [2] [3].

Citations:


🏁 Script executed:

# First, find the file to confirm it exists
find . -name "subscriber.ts" -path "*/lite/*" | head -20

Repository: moq-dev/moq

Length of output: 121


🏁 Script executed:

# Examine the file around lines 193-209 to verify the issue
sed -n '180,220p' ./js/lite/src/lite/subscriber.ts | cat -n

Repository: moq-dev/moq

Length of output: 1877


🏁 Script executed:

# Check for Signal.changed usage elsewhere in the file to understand the pattern
rg "Signal.changed\|changed\(" ./js/lite/src/lite/subscriber.ts -B 2 -A 2

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

# Also check the StateSignal or Signal type definition to understand its behavior
fd -e ts -e tsx -path "*signals*" | head -10

Repository: moq-dev/moq

Length of output: 224


🏁 Script executed:

# Search for Signal or state.priority definitions to understand the changed() behavior
rg "state\.priority|\.changed\(" ./js/lite/src/lite/subscriber.ts -B 1 -A 1

Repository: moq-dev/moq

Length of output: 202


🏁 Script executed:

# Find the Signal type definition in the `@moq/signals` library
find . -name "*.ts" -o -name "*.tsx" | xargs grep -l "changed.*\(.*resolve" | head -5

Repository: moq-dev/moq

Length of output: 79


🏁 Script executed:

# Check what happens when priorityWatch promise resolves - verify if dispose is called
sed -n '30,35p' ./js/lite/src/lite/subscriber.ts | cat -n

Repository: moq-dev/moq

Length of output: 197


🏁 Script executed:

# Search for peek() method usage to understand the pattern for checking current state
rg "\.peek\(" ./js/lite/src/ | head -5

Repository: moq-dev/moq

Length of output: 438


🏁 Script executed:

# Verify the exact control flow - check what happens after the Promise.race on line 30
sed -n '190,240p' ./js/lite/src/lite/subscriber.ts | cat -n

Repository: moq-dev/moq

Length of output: 2035


🏁 Script executed:

# Search for how updatePriority is called to understand if early updates are possible
rg "updatePriority" ./js/lite/src/lite/ -B 2 -A 2

Repository: moq-dev/moq

Length of output: 431


Fix priority watcher lifecycle to prevent listener buildup and dropped updates.

The current implementation has two critical issues:

  1. Listener accumulation: On line 197, each loop iteration registers a new listener via request.track.state.priority.changed(resolve), but disposal only happens on the undefined path (line 200). When priority !== undefined, the listener is never disposed, causing accumulation with each update.

  2. Dropped early updates: If updatePriority() is called before the listener starts on line 197, that change is missed. The listener only captures changes after registration.

  3. Resource leak on close: When Promise.race() resolves from stream or track closure (line 209), the priorityWatch loop never exits, leaving the final listener active indefinitely.

Proposed fix
 			// Watch for priority changes and send SUBSCRIBE_UPDATE.
+			const sendPriorityUpdate = async (priority: number) => {
+				const update = new SubscribeUpdate({ priority });
+				await update.encode(stream.writer, this.version);
+				console.debug(`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`);
+			};
+
+			// Flush any priority set before the watcher starts.
+			const pendingPriority = request.track.state.priority.peek();
+			if (pendingPriority !== undefined && pendingPriority !== request.priority) {
+				await sendPriorityUpdate(pendingPriority);
+			}
+
 			const priorityWatch = (async () => {
+				const stop = Symbol("stop");
 				for (;;) {
 					let dispose!: () => void;
-					const priority = await new Promise<number | undefined>((resolve) => {
+					const priority = await Promise.race([
+						new Promise<number | undefined>((resolve) => {
+							dispose = request.track.state.priority.changed(resolve);
+						}),
+						Promise.race([stream.reader.closed, request.track.closed]).then(() => stop),
+					]);
+					dispose();
+
+					if (priority === stop) break;
+					if (priority === undefined) continue;
+					await sendPriorityUpdate(priority);
-						dispose = request.track.state.priority.changed(resolve);
-					});
-					if (priority === undefined) {
-						dispose();
-						continue;
-					}
-					const update = new SubscribeUpdate({ priority });
-					await update.encode(stream.writer, this.version);
-					console.debug(`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`;
 				}
 			})();
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@js/lite/src/lite/subscriber.ts` around lines 193 - 209, The priorityWatch
loop in the priority watcher (the async IIFE named priorityWatch) must be
rewritten to avoid listener leaks and missed updates: before attaching a
listener via request.track.state.priority.changed(resolve) first check the
current priority snapshot (e.g., read the priority getter/value) so an update
that happened already is handled immediately; if you attach a listener ensure
its dispose/unsubscribe is always called in every path (use try/finally around
the awaited Promise) so listeners are not accumulated; and make the loop
cancellable/terminating when stream.reader.closed or request.track.closed
resolves (for example pass an AbortSignal tied to those closures or break the
loop when either is closed) so the pending listener is cleaned up on shutdown.
Apply changes inside the priorityWatch IIFE referencing
request.track.state.priority.changed, updatePriority handling, SubscribeUpdate
encoding, and the Promise.race with stream.reader.closed and
request.track.closed.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if the first issue is real; I think dispose only needs to be called if the callback hasn't been fired yet.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find and examine the Signal.changed() implementation in `@moq/signals`
fd -e ts signal.ts --full-path | head -10
rg -n "changed" js/signals/src/ -A 5 -B 2

Length of output: 4535


@kixelated, you're right — I apologize for the incorrect analysis. Looking at the actual Signal.changed() implementation in js/signals/src/index.ts:

changed(fn: (value: T) => void): Dispose {
    this.#changed.add(fn);
    return () => this.#changed.delete(fn);
}

And the flush logic (lines 109–126):

const changed = this.#changed;
this.#changed = new Set();   // ← set is swapped out before callbacks run

for (const fn of changed) {
    fn(value);               // callback invoked from the old set
}

The #changed set is replaced with a fresh empty Set before any callbacks are invoked. So once the callback fires and resolve is called, the listener has already been evicted from #changed. Calling dispose() afterward does this.#changed.delete(fn) on the new empty set — a no-op. There is no listener accumulation. Your understanding is correct: dispose() only needs to be called if the callback hasn't fired yet (e.g., to cancel a pending listener on early teardown), which is exactly what the try/finally in the current file content handles.


🧬 Code Graph Analysis Results

Snippet 1: priorities update disposal in Subscriber.runSubscribe

  • Filename: js/lite/src/lite/subscriber.ts

  • Approximate location: within the #runSubscribe method (priority watch block)

  • Contents (complete as in the relevant code block):

    const priorityWatch = (async () => {
    for (;;) {
    let dispose!: () => void;
    try {
    const priority = await new Promise<number | undefined>((resolve) => {
    dispose = request.track.state.priority.changed(resolve);
    });
    if (priority === undefined) continue;
    const update = new SubscribeUpdate({ priority });
    await update.encode(stream.writer, this.version);
    console.debug(
    subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority},
    );
    } finally {
    dispose();
    }
    }
    })();

  • Purpose of snippet:
    Demonstrates how the code subscribes to a per-track priority signal, obtains a disposer from the signal, waits for a priority change (or undefined), and then issues a SUBSCRIBE_UPDATE. The disposer is invoked in a finally block to unregister the listener.

Notes for reviewer:

  • The user’s comment asks whether dispose should be called only if the callback hasn’t fired yet. This snippet shows the current pattern: dispose() is invoked unconditionally in finally, regardless of whether the priority callback already fired. For clarity, see the accompanying pattern in the same block where the disposer is obtained from request.track.state.priority.changed and then disposed in finally. If the callback has already fired, the disposer’s behavior depends on the implementation of the signal’s disposer (often it is idempotent, but that isn’t guaranteed by all signal implementations).

✏️ Learnings added
Learnt from: kixelated
Repo: moq-dev/moq PR: 1396
File: js/lite/src/lite/subscriber.ts:193-212
Timestamp: 2026-05-15T16:52:56.228Z
Learning: In `js/signals/src/index.ts`, `Signal.changed(fn)` is a **one-time** listener. During flush, `this.#changed` is replaced with a new empty Set before iterating the old set and firing callbacks. This means calling `dispose()` after the callback has already fired is always a no-op — the listener is auto-removed on firing. Listener accumulation is not a concern; `dispose()` is only needed to cancel a still-pending (unfired) listener.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.

@metapox metapox force-pushed the feat/subscribe-update-js branch from 8f672bf to 83d79b4 Compare May 11, 2026 08:20
…isher

- Track: add priority Signal and updatePriority() method
- Subscriber: watch for priority changes after SubscribeOk, send SubscribeUpdate
- Publisher: apply received SubscribeUpdate priority to track (was TODO)
@metapox metapox force-pushed the feat/subscribe-update-js branch from 83d79b4 to a08dcb3 Compare May 11, 2026 08:31
Comment thread js/lite/src/lite/subscriber.ts Outdated
Comment on lines +197 to +199
const priority = await new Promise<number | undefined>((resolve) => {
dispose = request.track.state.priority.changed(resolve);
});
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, but feels gross. Open to suggestions to improve moq/signals too.

kixelated and others added 2 commits May 22, 2026 14:17
Address PR review feedback:

- Add Signal.next() to @moq/signals so callers can await the next value
  without the manual `let dispose!; try { await new Promise(...) } finally
  { dispose() }` pattern. The internal listener is already auto-removed
  when fired, so dispose() in the happy path is a no-op.
- Use Signal.next() in the subscriber's priority watcher, removing the
  awkward dispose dance.
- Wrap the SUBSCRIBE_UPDATE debug log in publisher.ts so biome's
  formatter is happy (this was the CI failure).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
js/net/src/lite/subscriber.ts (1)

195-208: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Priority watcher is not cancellable and can miss pre-existing priority changes.

On Line 198, next() has no dispose path, so if Line 208 resolves from stream.reader.closed / request.track.closed first, the pending wait can outlive the subscription. Also, a priority already set before the loop starts won’t be sent.

Proposed fix
 			// Watch for priority changes and send SUBSCRIBE_UPDATE.
+			const sendPriorityUpdate = async (priority: number) => {
+				const update = new SubscribeUpdate({ priority });
+				await update.encode(stream.writer, this.version);
+				console.debug(
+					`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`,
+				);
+			};
+
+			// Flush a priority that may have changed before watcher startup.
+			const initialPriority = request.track.state.priority.peek();
+			if (initialPriority !== undefined && initialPriority !== request.priority) {
+				await sendPriorityUpdate(initialPriority);
+			}
+
 			const priorityWatch = (async () => {
+				const stop = Symbol("stop");
 				for (;;) {
-					const priority = await request.track.state.priority.next();
+					let dispose!: () => void;
+					const priority = await Promise.race([
+						new Promise<number | undefined>((resolve) => {
+							dispose = request.track.state.priority.changed(resolve);
+						}),
+						Promise.race([stream.reader.closed, request.track.closed]).then(() => stop),
+					]);
+					dispose();
+
+					if (priority === stop) break;
 					if (priority === undefined) continue;
-					const update = new SubscribeUpdate({ priority });
-					await update.encode(stream.writer, this.version);
-					console.debug(
-						`subscribe update: id=${id} broadcast=${broadcast} track=${request.track.name} priority=${priority}`,
-					);
+					await sendPriorityUpdate(priority);
 				}
 			})();
#!/bin/bash
# Verify whether Track.close() guarantees a priority signal transition
# and inspect Signal.next()/changed cancellation behavior.
rg -n -C4 --type=ts '\bclass\s+Track\b|close\s*\(|state\.priority|updatePriority\s*\(' js/net/src/track.ts
rg -n -C4 --type=ts '\bnext\s*\(\)\s*:\s*Promise<|changed\s*\(' js/signals/src/index.ts
rg -n -C4 --type=ts 'priorityWatch|state\.priority\.(next|changed)\(' js/net/src/lite/subscriber.ts
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@js/net/src/lite/subscriber.ts` around lines 195 - 208, The priority watcher
(priorityWatch) must be made cancellable and must emit any currently-set
priority immediately: before entering the loop, read the current priority value
(e.g., via request.track.state.priority.changed/get or equivalent) and if
defined build a SubscribeUpdate and call SubscribeUpdate.encode(stream.writer,
this.version); then replace the infinite for(;;) awaiting
request.track.state.priority.next() with a loop that races each next() against a
cancellation promise composed from stream.reader.closed and request.track.closed
(or use an AbortController tied to those closures) so the wait is aborted when
the subscription ends; ensure you break out of the loop and stop sending updates
when the cancellation resolves. Use the symbols priorityWatch,
request.track.state.priority.next (or .changed/.get), SubscribeUpdate.encode,
stream.reader.closed and request.track.closed to locate and update the code.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@js/net/src/lite/subscriber.ts`:
- Around line 195-208: The priority watcher (priorityWatch) must be made
cancellable and must emit any currently-set priority immediately: before
entering the loop, read the current priority value (e.g., via
request.track.state.priority.changed/get or equivalent) and if defined build a
SubscribeUpdate and call SubscribeUpdate.encode(stream.writer, this.version);
then replace the infinite for(;;) awaiting request.track.state.priority.next()
with a loop that races each next() against a cancellation promise composed from
stream.reader.closed and request.track.closed (or use an AbortController tied to
those closures) so the wait is aborted when the subscription ends; ensure you
break out of the loop and stop sending updates when the cancellation resolves.
Use the symbols priorityWatch, request.track.state.priority.next (or
.changed/.get), SubscribeUpdate.encode, stream.reader.closed and
request.track.closed to locate and update the code.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b27ea04f-b744-4e11-8ec2-14b8d422a045

📥 Commits

Reviewing files that changed from the base of the PR and between a08dcb3 and e070e4e.

📒 Files selected for processing (4)
  • js/net/src/lite/publisher.ts
  • js/net/src/lite/subscriber.ts
  • js/net/src/track.ts
  • js/signals/src/index.ts
💤 Files with no reviewable changes (1)
  • js/net/src/track.ts

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(lite): JS Subscriber lacks SUBSCRIBE_UPDATE support for dynamic priority changes

2 participants