diff --git a/.github/workflows/publish-release.yml b/.github/workflows/publish-release.yml index 336837a..710ee66 100644 --- a/.github/workflows/publish-release.yml +++ b/.github/workflows/publish-release.yml @@ -125,7 +125,7 @@ jobs: run: | mkdir -p dist - # Create .deb package + # Create .deb package (also ships the systemd unit, ADR-0005) fpm -s dir -t deb \ -n ceracoder \ -v "${VERSION}" \ @@ -139,7 +139,8 @@ jobs: --depends "libgstreamer1.0-0" \ --depends "libgstreamer-plugins-base1.0-0" \ -p "dist/ceracoder_${VERSION}_${ARCH}.deb" \ - build-output/usr/=/usr/ + build-output/usr/=/usr/ \ + packaging/systemd/ceracoder.service=/usr/lib/systemd/system/ceracoder.service # Create .tar.gz archive mkdir -p tarball/ceracoder-${VERSION} diff --git a/.gitignore b/.gitignore index 4e39018..6150f0c 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ ceracoder # Test binaries tests/test_balancer tests/test_integration +tests/test_reconnect +tests/test_frame_liveness tests/test_srt tests/test_srt_integration tests/test_srt_live_transmit @@ -19,3 +21,6 @@ tests/test_srt_live_transmit bindings/typescript/node_modules/ bindings/typescript/dist/ bindings/typescript/*.tsbuildinfo + +# OpenCode agent state (local only) +.omo/ diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..10ca5e0 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,67 @@ +# ceracoder + +Parent: [../AGENTS.md](../AGENTS.md) + +## ROLE IN THE GROUP + +GStreamer-based hardware video encoder. Reads a pipeline from file, streams output over SRT with dynamic bitrate control. Fork of [irlserver/belacoder](https://github.com/irlserver/belacoder) (itself from BELABOX/belacoder). + +**Depends on:** `libsrt` (compile-time, `pkg-config srt`), GStreamer 1.0 +**Consumed by:** CeraUI backend (TypeScript bindings at `bindings/typescript`), device image (packaged as `belacoder` .deb) + +## STRUCTURE + +``` +ceracoder/ +├── src/ +│ ├── ceracoder.c # entry point +│ ├── core/ # bitrate control, balancers, config +│ ├── gst/ # encoder_control, overlay_ui +│ ├── io/ # cli_options, pipeline_loader +│ └── net/ # srt_client +├── camlink_workaround/ # submodule → BELABOX/camlink.git +├── bindings/typescript/ # TS bindings consumed by CeraUI backend +├── docs/ # architecture, bitrate-control, dependencies, versioning +├── tests/ # cmocka unit tests +└── Makefile # primary build entry +``` + +## BUILD + +```bash +# Requires: libsrt >= 1.4.0, gstreamer-1.0, gstreamer-app-1.0, cmocka (tests) +make # builds ceracoder binary; inits camlink_workaround submodule +make test # runs cmocka tests +``` + +`VERSION` = `git rev-parse --short HEAD` (short SHA, not semver). Baked in at compile time via `-DVERSION`. + +**pkg-config deps:** `gstreamer-1.0 gstreamer-app-1.0 srt` — all must be present before `make`. + +## SUBMODULE + +`camlink_workaround/` → `https://github.com/BELABOX/camlink.git` +Run `git submodule init && git submodule update` if missing. `make submodule` does this automatically. + +## TYPESCRIPT BINDINGS + +`bindings/typescript/` — consumed by CeraUI backend via `link:../../../ceracoder/bindings/typescript` in its `package.json`. Built with `bun`. Don't edit `dist/` directly; source is in `bindings/typescript/src/`. + +## WHERE TO LOOK + +| Task | Location | +|------|----------| +| Bitrate algorithm | `src/core/bitrate_control.c` + [docs/bitrate-control.md](docs/bitrate-control.md) | +| Architecture overview | [docs/architecture.md](docs/architecture.md) | +| Runtime dependencies | [docs/dependencies.md](docs/dependencies.md) | +| Versioning scheme | [docs/versioning.md](docs/versioning.md) | +| SRT send logic | `src/net/srt_client.c` | +| Pipeline loading | `src/io/pipeline_loader.c` | +| TS bindings source | `bindings/typescript/src/` | + +## ANTI-PATTERNS + +- Don't hardcode a semver string for VERSION — it's always the git short-SHA. +- Don't modify `camlink_workaround/` directly; it's a submodule. +- Don't duplicate docs content here — link to `docs/` instead. +- Don't run `make` without `libsrt >= 1.4.0` installed; pkg-config will fail silently on some systems. diff --git a/Makefile b/Makefile index ccb3954..ee50918 100644 --- a/Makefile +++ b/Makefile @@ -15,9 +15,12 @@ TESTDIR = tests OBJS = $(SRCDIR)/ceracoder.o \ $(SRCDIR)/io/cli_options.o \ $(SRCDIR)/io/pipeline_loader.o \ + $(SRCDIR)/io/sd_notify.o \ $(SRCDIR)/net/srt_client.o \ + $(SRCDIR)/net/srt_reconnect.o \ $(SRCDIR)/gst/encoder_control.o \ $(SRCDIR)/gst/overlay_ui.o \ + $(SRCDIR)/gst/frame_liveness.o \ $(SRCDIR)/core/balancer_runner.o \ $(SRCDIR)/core/bitrate_control.o \ $(SRCDIR)/core/config.o \ @@ -44,10 +47,10 @@ $(SRCDIR)/%.o: $(SRCDIR)/%.c $(CC) $(CFLAGS) -c $< -o $@ # Test targets -test: submodule test_balancer test_integration +test: submodule test_balancer test_integration test_reconnect test_frame_liveness # Full test suite including SRT network tests -test_all: submodule test_balancer test_integration test_srt test_srt_live_transmit +test_all: submodule test_balancer test_integration test_reconnect test_frame_liveness test_srt test_srt_live_transmit test_balancer: $(TESTDIR)/test_balancer.o $(TEST_OBJS) $(CC) $(TEST_CFLAGS) $^ -o $(TESTDIR)/$@ $(TEST_LDFLAGS) @@ -57,6 +60,16 @@ test_integration: $(TESTDIR)/test_integration.o $(TEST_OBJS) $(CC) $(TEST_CFLAGS) $^ -o $(TESTDIR)/$@ $(TEST_LDFLAGS) ./$(TESTDIR)/$@ +# Reconnect state machine tests (pure logic, no live SRT socket / GStreamer) +test_reconnect: $(TESTDIR)/test_reconnect.o $(SRCDIR)/net/srt_reconnect.o + $(CC) $(TEST_CFLAGS) $^ -o $(TESTDIR)/$@ $(TEST_LDFLAGS) + ./$(TESTDIR)/$@ + +# Frame-production liveness tests (pure logic, injectable clock, no GStreamer/SRT) +test_frame_liveness: $(TESTDIR)/test_frame_liveness.o $(SRCDIR)/gst/frame_liveness.o + $(CC) $(TEST_CFLAGS) $^ -o $(TESTDIR)/$@ $(TEST_LDFLAGS) + ./$(TESTDIR)/$@ + # SRT integration tests (requires network, runs actual SRT connections) test_srt: $(TESTDIR)/test_srt_integration.o $(SRCDIR)/net/srt_client.o $(CC) $(TEST_CFLAGS) $^ -o $(TESTDIR)/$@ $(TEST_LDFLAGS) -lpthread @@ -79,7 +92,7 @@ lint: clean: rm -f ceracoder \ $(SRCDIR)/*.o $(SRCDIR)/core/*.o $(SRCDIR)/io/*.o $(SRCDIR)/net/*.o $(SRCDIR)/gst/*.o \ - $(TESTDIR)/*.o $(TESTDIR)/test_balancer $(TESTDIR)/test_integration $(TESTDIR)/test_srt $(TESTDIR)/test_srt_live_transmit camlink_workaround/*.o + $(TESTDIR)/*.o $(TESTDIR)/test_balancer $(TESTDIR)/test_integration $(TESTDIR)/test_reconnect $(TESTDIR)/test_frame_liveness $(TESTDIR)/test_srt $(TESTDIR)/test_srt_live_transmit camlink_workaround/*.o -.PHONY: all submodule clean test test_all test_balancer test_integration test_srt test_srt_live_transmit lint +.PHONY: all submodule clean test test_all test_balancer test_integration test_reconnect test_frame_liveness test_srt test_srt_live_transmit lint diff --git a/bindings/typescript/AGENTS.md b/bindings/typescript/AGENTS.md new file mode 100644 index 0000000..032dcd8 --- /dev/null +++ b/bindings/typescript/AGENTS.md @@ -0,0 +1,38 @@ +# ceracoder/bindings/typescript + +Parent: [../../AGENTS.md](../../AGENTS.md) + +## OVERVIEW + +Package `@ceralive/ceracoder` — type-safe TypeScript bindings for the ceracoder C encoder. Zod v4 schemas, config/CLI builders, process helpers, and a `PipelineBuilder` for hardware-specific GStreamer launch strings. + +**Consumer:** `CeraUI/apps/backend` via `"@ceralive/ceracoder": "link:../../../ceracoder/bindings/typescript"` in its `package.json`. Breaking the public API here breaks CeraUI backend. + +## PUBLIC API SURFACE + +| Export | Source | Purpose | +|--------|--------|---------| +| `ceracoderConfigSchema`, `CeracoderConfig` | `types.ts` | Zod schema + inferred type for ceracoder config | +| `buildCeracoderConfig`, `serializeCeracoderConfig` | `config.ts` | Config object builder + serializer | +| `buildCeracoderArgs` | `cli.ts` | CLI args array builder (always uses `-c `, never legacy `-b`) | +| `spawnCeracoder`, `sendHup`, `sendTerm` | `process.ts` | Process lifecycle helpers | +| `writeConfig`, `writePipeline` | `process.ts` | Write config/pipeline files to disk | +| `PipelineBuilder` | `pipeline/` | Hardware-specific GStreamer launch string generator | +| Constants (`DEFAULT_*`) | `constants.ts` | Bitrate/latency defaults aligned with C implementation | + +## WHERE TO LOOK + +| Task | Location | +|------|----------| +| Add/change a config field | `src/types.ts` (schema) + `src/config.ts` (defaults) + `src/constants.ts` | +| Add a new pipeline hardware target | `src/pipeline/` — add hardware dir + update `PipelineBuilder` | +| Change CLI flags | `src/cli.ts` — check C source `src/io/cli_options.c` stays in sync | +| Build / type-check | `bun run build` or `bun run lint` | + +## ANTI-PATTERNS + +- Don't edit `dist/` directly — generated by `bun run build` from `src/`. +- Don't remove or rename exported symbols without updating `CeraUI/apps/backend` first. +- Don't add a `-b` bitrate flag to `buildCeracoderArgs` — legacy, removed intentionally. +- Don't publish to npm; this package is consumed only via `link:` from sibling checkout. +- Don't bump `version` in `package.json` independently — ceracoder uses git-SHA versioning. diff --git a/packaging/systemd/ceracoder.service b/packaging/systemd/ceracoder.service new file mode 100644 index 0000000..cf9aaf0 --- /dev/null +++ b/packaging/systemd/ceracoder.service @@ -0,0 +1,47 @@ +[Unit] +Description=Ceracoder - SRT video encoder with dynamic bitrate (CeraLive) +Documentation=https://github.com/CERALIVE/ceracoder +# Bonding link layer must be up first; ceracoder sends SRT to the local srtla_send. +After=network-online.target srtla-send.service +Wants=network-online.target +# Crash-loop damping (ADR-0005): if ceracoder exits >= StartLimitBurst times +# within StartLimitIntervalSec, systemd stops respawning and enters the failed +# state instead of a tight restart loop. The streaming failure is then surfaced +# to the operator rather than hammering the encoder. +StartLimitIntervalSec=60 +StartLimitBurst=5 + +[Service] +# Type=notify: ceracoder calls sd_notify(READY=1) once the pipeline is PLAYING +# and the SRT connection is established, so systemd only considers the unit +# "active" when the encoder is actually running. +Type=notify +NotifyAccess=main + +# Per-stream arguments (pipeline file, SRT host/port, stream id, bitrate config) +# are written by the CeraUI control plane before it starts this unit. The "-" +# prefix makes the file optional so the unit still validates when absent. +EnvironmentFile=-/run/ceralive/ceracoder.env +ExecStart=/usr/bin/ceracoder $CERACODER_ARGS + +# systemd is the SOLE process-restart authority (ADR-0005). ceracoder's +# in-process SRT reconnect absorbs transient link blips WITHOUT exiting; only a +# permanent failure exits non-zero and hands authority here. +Restart=on-failure +RestartSec=5 + +# Hardware-style watchdog: ceracoder pets WATCHDOG=1 from its GLib main loop. +# If the process hangs/zombies (main loop stuck, or - after Task 12 - frame +# production stalls), the ping stops, systemd kills the process and Restart= +# respawns it. +# +# WatchdogSec MUST exceed ceracoder's in-process SRT reconnect backoff cap +# (30s, Task 8) so a legitimate reconnect window is never mistaken for a hang. +# 90s = 30s cap + headroom. +WatchdogSec=90 + +KillMode=mixed +SyslogIdentifier=ceracoder + +[Install] +WantedBy=multi-user.target diff --git a/src/ceracoder.c b/src/ceracoder.c index a5e60df..9d8f53a 100644 --- a/src/ceracoder.c +++ b/src/ceracoder.c @@ -37,11 +37,14 @@ #include "cli_options.h" #include "config.h" #include "srt_client.h" +#include "srt_reconnect.h" #include "pipeline_loader.h" #include "encoder_control.h" #include "overlay_ui.h" #include "balancer_runner.h" #include "bitrate_control.h" +#include "sd_notify.h" +#include "frame_liveness.h" // SRT ACK timeout #define SRT_ACK_TIMEOUT 6000 // maximum interval between received ACKs before the connection is TOed @@ -74,6 +77,49 @@ static int quit = 0; static int av_delay = 0; static int srt_pkt_size = DEFAULT_SRT_PKT_SIZE; +/* + * In-process SRT reconnect state (ADR-0005 device supervision model). + * + * On a transient SRT drop (send failure or ACK timeout) ceracoder retries the + * connection in-process with exponential backoff instead of hard-exiting, so a + * brief link blip no longer tears down the stream. The process exits non-zero + * only when the bounded reconnect window is exhausted or a permanent reject is + * classified, handing restart authority to systemd (Restart=on-failure). + */ +static ReconnectController reconnect_ctrl; +// Connection target captured at startup so the reconnect path can redial. +static const char *srt_host = NULL; +static const char *srt_port = NULL; +static const char *srt_stream_id = NULL; +static int srt_latency_ms = 0; +// Set by the appsink streaming thread on send failure; observed by the main +// loop's housekeeping to drive the reconnect. Treated like `quit`: a simple +// cross-thread flag in the style this codebase already uses. +static volatile sig_atomic_t srt_lost = 0; +// ACK-liveness tracking (file scope so a successful reconnect can reset it +// against the fresh socket's zeroed counters). +static uint64_t prev_ack_ts = 0; +static uint64_t prev_ack_count = 0; +// Process exit code: 0 for clean shutdown (SIGTERM/SIGINT), non-zero when an +// unrecoverable SRT failure must escalate to the systemd supervisor. +static volatile sig_atomic_t exit_code = EXIT_SUCCESS; + +/* + * Frame-production liveness (ADR-0005 device supervision model, "zombie-encode" + * detection). Updated once per produced frame from the appsink streaming thread + * (new_buf_cb) and read from the main loop's watchdog ping. Process liveness + * alone cannot see an encoder that is alive with the SRT link up yet producing + * no frames; this tracks whether buffers are still advancing through appsink. + * The two shared scalars inside are C11 relaxed atomics, so this cross-thread + * access needs no lock. The watchdog ping is gated on it: a stalled pipeline + * withholds WATCHDOG=1 and systemd respawns the process. + */ +static FrameLiveness frame_liveness; + +// Forward declarations for the reconnect machinery. +static void srt_reconnect_start(void); +gboolean srt_reconnect_attempt_cb(gpointer user_data); + // Configuration static BelacoderConfig g_config; static char *bitrate_filename = NULL; @@ -90,6 +136,25 @@ uint64_t getms() { return ((uint64_t)ts.tv_sec * 1000) + ((uint64_t)ts.tv_nsec / 1000000); } +/* + Frame-production liveness health surface (ADR-0005). Non-static so the health + RPC (Task 13) can consume the same signal the watchdog ping gates on without + reaching into the FrameLiveness struct. Thin wrappers over the pure tracker + sampled against the live monotonic clock. + + ceracoder_frames_advancing(): false once the encoder has produced no frame for + the configured stall threshold (default 3s) — i.e. a zombie-encode — true + while frames keep flowing. + ceracoder_frame_count(): total frames produced this run (monotonic counter). +*/ +bool ceracoder_frames_advancing(void) { + return frame_liveness_advancing_at(&frame_liveness, getms()); +} + +uint64_t ceracoder_frame_count(void) { + return frame_liveness_count(&frame_liveness); +} + // Parse a string to long with full error checking static int parse_long(const char *str, long *result, long min_val, long max_val) { if (str == NULL || *str == '\0') { @@ -119,6 +184,20 @@ static int parse_long(const char *str, long *result, long min_val, long max_val) return 0; } +// Read a bounded integer from the environment, falling back to def when unset +// or invalid. Used to override the reconnect window/backoff without touching the +// INI config schema (keeps the stable TypeScript bindings unchanged). +static long env_long(const char *name, long def, long lo, long hi) { + const char *v = getenv(name); + if (v == NULL) return def; + long out; + if (parse_long(v, &out, lo, hi) != 0) { + fprintf(stderr, "Ignoring invalid %s=%s; using %ld\n", name, v, def); + return def; + } + return out; +} + // Forward declaration int read_bitrate_file(void); @@ -131,6 +210,17 @@ void stop() { } } +/* Stop with a specific process exit code. A non-zero code escalates an + unrecoverable failure to the systemd supervisor (Restart=on-failure), + per ADR-0005. The first non-zero code wins so it survives a later clean + stop() triggered while the main loop unwinds. */ +void stop_with_code(int code) { + if (code != EXIT_SUCCESS && exit_code == EXIT_SUCCESS) { + exit_code = code; + } + stop(); +} + // Async-signal-safe handler for SIGHUP void sighup_handler(int sig) { (void)sig; @@ -263,11 +353,96 @@ void do_bitrate_update(SRT_TRACEBSTATS *stats, uint64_t ctime) { encoder_control_set_bitrate(&encoder_ctrl, output.new_bitrate); } +/* + One in-process reconnect attempt, scheduled as a one-shot GLib timeout after + the backoff delay (keeps the main loop signal-responsive between attempts). + Adaptive-bitrate and encoder state are intentionally untouched — only the SRT + socket is rebuilt. Re-arms itself for the next attempt; on a permanent reject + or exhausted window it escalates to systemd via a clean non-zero exit. +*/ +gboolean srt_reconnect_attempt_cb(gpointer user_data) { + (void)user_data; + if (quit) return G_SOURCE_REMOVE; + + int attempt = reconnect_attempt_count(&reconnect_ctrl); + int ret = srt_client_connect(&srt_client, srt_host, srt_port, + srt_stream_id, srt_latency_ms, srt_pkt_size); + if (ret == 0) { + reconnect_succeeded(&reconnect_ctrl); + // Reset ACK-liveness so the dead socket's stale timestamp can't immediately + // re-trigger the timeout against the fresh socket's zeroed counters. + prev_ack_ts = 0; + prev_ack_count = 0; + srt_lost = 0; + fprintf(stderr, "SRT reconnected on attempt %d (total reconnects this run: %d)\n", + attempt, reconnect_total_reconnects(&reconnect_ctrl)); + return G_SOURCE_REMOVE; + } + + // A reject the server will not retract: stop retrying, hand off to systemd. + if (reconnect_reason_is_permanent(ret)) { + fprintf(stderr, "SRT reconnect: permanent failure (reason %d); " + "exiting for supervisor restart\n", ret); + stop_with_code(EXIT_FAILURE); + return G_SOURCE_REMOVE; + } + + long backoff = reconnect_next_backoff_ms(&reconnect_ctrl); + if (backoff < 0) { + fprintf(stderr, "SRT reconnect window exhausted after %d attempts; " + "exiting for supervisor restart\n", attempt); + stop_with_code(EXIT_FAILURE); + return G_SOURCE_REMOVE; + } + + fprintf(stderr, "SRT reconnect attempt %d failed (reason %d); " + "retrying in %ld ms\n", attempt, ret, backoff); + g_timeout_add((guint)backoff, srt_reconnect_attempt_cb, NULL); + return G_SOURCE_REMOVE; +} + +/* Begin a reconnect episode: drop the dead socket, arm the backoff machine, and + schedule the first attempt. Idempotent while a reconnect is already running. */ +static void srt_reconnect_start(void) { + if (quit) return; + if (reconnect_is_reconnecting(&reconnect_ctrl)) return; + + srt_client_close(&srt_client); + reconnect_begin(&reconnect_ctrl); + + long backoff = reconnect_next_backoff_ms(&reconnect_ctrl); + if (backoff < 0) { + fprintf(stderr, "SRT reconnect disabled by configuration; " + "exiting for supervisor restart\n"); + stop_with_code(EXIT_FAILURE); + return; + } + + fprintf(stderr, "SRT connection lost; starting in-process reconnect " + "(attempt %d in %ld ms, backoff cap %u ms, %s window)\n", + reconnect_attempt_count(&reconnect_ctrl), backoff, + reconnect_ctrl.max_backoff_ms, + reconnect_ctrl.max_attempts > 0 ? "bounded" : "unlimited"); + g_timeout_add((guint)backoff, srt_reconnect_attempt_cb, NULL); +} + gboolean connection_housekeeping(gpointer user_data) { (void)user_data; + if (quit) return TRUE; + uint64_t ctime = getms(); - static uint64_t prev_ack_ts = 0; - static uint64_t prev_ack_count = 0; + + // Socket is closed/redialing during a reconnect — skip stats until it settles. + if (reconnect_is_reconnecting(&reconnect_ctrl)) { + return TRUE; + } + + // srt_lost is raised by the appsink thread; the main loop owns the reconnect + // so every GLib timer is scheduled from a single thread. + if (srt_lost) { + srt_reconnect_start(); + return TRUE; + } // SRT stats SRT_TRACEBSTATS stats; @@ -279,10 +454,13 @@ gboolean connection_housekeeping(gpointer user_data) { prev_ack_count = stats.pktRecvACKTotal; prev_ack_ts = ctime; } - /* Manual check for connection timeout */ + /* ACK timeout now triggers in-process reconnect instead of a hard exit, so a + transient ACK gap no longer tears down the stream (ADR-0005). */ if (prev_ack_count != 0 && (ctime - prev_ack_ts) > SRT_ACK_TIMEOUT) { - fprintf(stderr, "The SRT connection timed out, exiting\n"); - stop(); + fprintf(stderr, "The SRT connection timed out; attempting in-process reconnect\n"); + srt_lost = 1; + srt_reconnect_start(); + return TRUE; } // Update bitrate when we have a configurable encoder @@ -302,6 +480,14 @@ GstFlowReturn new_buf_cb(GstAppSink *sink, gpointer user_data) { GstSample *sample = gst_app_sink_pull_sample(sink); if (!sample) return GST_FLOW_ERROR; + /* A buffer reached appsink => the pipeline produced a frame. Record liveness + here (before any SRT logic) so frame *production* is tracked independently + of transmission: encoded frames still count while the SRT link is down and + packets are being dropped, distinguishing a network blip (handled by the + reconnect machine) from a zombie-encode (handled by the watchdog). Cheap: + a relaxed atomic store + increment (ADR-0005, Task 12). */ + frame_liveness_record(&frame_liveness, getms()); + GstBuffer *buffer = NULL; GstMapInfo map = {0}; @@ -316,14 +502,16 @@ GstFlowReturn new_buf_cb(GstAppSink *sink, gpointer user_data) { pkt_len += copy_sz; if (pkt_len == srt_pkt_size) { - int nb = srt_client_send(&srt_client, pkt, srt_pkt_size); - if (nb != srt_pkt_size) { - if (!quit) { - fprintf(stderr, "The SRT connection failed, exiting\n"); - stop(); + /* While the connection is down we drop encoded packets rather than block + or send on a dead socket; the main loop reconnects in-process. On a send + failure we only raise srt_lost (no stop()) — systemd, not ceracoder, + owns process restart (ADR-0005). The pipeline stays alive either way. */ + if (!srt_lost) { + int nb = srt_client_send(&srt_client, pkt, srt_pkt_size); + if (nb != srt_pkt_size && !quit) { + fprintf(stderr, "SRT send failed; flagging in-process reconnect\n"); + srt_lost = 1; } - code = GST_FLOW_ERROR; - goto ret; } pkt_len = 0; } @@ -331,7 +519,6 @@ GstFlowReturn new_buf_cb(GstAppSink *sink, gpointer user_data) { sample_sz -= copy_sz; } while(sample_sz); -ret: gst_buffer_unmap(buffer, &map); gst_sample_unref(sample); @@ -437,9 +624,53 @@ void cb_pipeline (GstBus *bus, GstMessage *message, gpointer user_data) { } } -// Only called if the pipeline failed to stop +// Only called if the pipeline failed to stop in time. Preserve the chosen exit +// code so an unrecoverable-failure shutdown still exits non-zero for systemd, +// while a SIGINT/SIGTERM shutdown stays clean (exit_code defaults to SUCCESS). void cb_sigalarm(int signum) { - _exit(EXIT_SUCCESS); // exiting deliberately following SIGINT or SIGTERM + (void)signum; + _exit(exit_code); +} + +/* + Pets the systemd watchdog (WatchdogSec=) from the GLib main loop. + + Per ADR-0005, systemd is the sole process-restart authority; ceracoder pets + the watchdog so a hung or zombie process is killed and respawned. Two liveness + conditions must BOTH hold to emit the ping: + + 1. The GLib main loop is still dispatching — reaching this callback proves + it (catches a hung/stuck main loop). + 2. The encoder is still producing frames — ceracoder_frames_advancing() + (catches a zombie-encode: process alive, SRT possibly up, but no encoded + frames for the stall threshold). + + When frames have stalled we deliberately WITHHOLD WATCHDOG=1: the keep-alive + stops, WatchdogSec elapses, and systemd kills + respawns the process. This is + the frame-production health signal feeding the single restart authority — we + never respawn independently. No-op when not supervised by systemd + (NOTIFY_SOCKET unset). +*/ +gboolean watchdog_ping(gpointer user_data) { + (void)user_data; + if (quit) { + return G_SOURCE_REMOVE; + } + if (ceracoder_frames_advancing()) { + sd_notify_watchdog(); + } else { + /* Zombie-encode: main loop alive but no frames produced for the stall + threshold. Withhold the ping so WatchdogSec fires (ADR-0005). Logged once + per missed ping (the watchdog interval) so the supervisor restart cause + is visible in the journal without flooding it per frame. */ + fprintf(stderr, + "Frame production stalled (no encoded frames for >= %llu ms, " + "%llu frames this run); withholding systemd watchdog ping so the " + "supervisor restarts us\n", + (unsigned long long)frame_liveness_threshold_ms(&frame_liveness), + (unsigned long long)ceracoder_frame_count()); + } + return TRUE; } #define FIXED_ARGS 3 @@ -503,6 +734,21 @@ int main(int argc, char** argv) { int srt_latency = (opts.srt_latency != 2000) ? opts.srt_latency : (g_config.srt_latency > 0 ? g_config.srt_latency : 2000); + // Capture the SRT target + arm the in-process reconnect machine so a transient + // loss can redial without a process restart (ADR-0005). Default = bounded + // window; pass RECONNECT_UNLIMITED to never escalate transient loss. + srt_host = opts.srt_host; + srt_port = opts.srt_port; + srt_stream_id = opts.stream_id; + srt_latency_ms = srt_latency; + unsigned int rc_base = (unsigned int)env_long("CERACODER_RECONNECT_BASE_MS", + RECONNECT_DEFAULT_BASE_MS, 1, 600000); + unsigned int rc_cap = (unsigned int)env_long("CERACODER_RECONNECT_MAX_MS", + RECONNECT_DEFAULT_MAX_MS, 1, 600000); + int rc_attempts = (int)env_long("CERACODER_RECONNECT_MAX_ATTEMPTS", + RECONNECT_DEFAULT_MAX_ATTEMPTS, 0, 1000000); + reconnect_init(&reconnect_ctrl, rc_base, rc_cap, rc_attempts); + // Initialize balancer if (balancer_runner_init(&balancer_runner, &g_config, opts.balancer_name, srt_latency, srt_pkt_size) != 0) { @@ -599,8 +845,39 @@ int main(int argc, char** argv) { signal(SIGALRM, cb_sigalarm); g_timeout_add(1000, stall_check, NULL); + /* Arm frame-production liveness just before the pipeline starts so the + startup baseline (time-to-first-frame budget) begins at PLAYING, not during + the blocking SRT connect loop above. Threshold is env-overridable + (CERACODER_FRAME_STALL_MS) like the reconnect knobs, leaving the INI schema + and TS bindings untouched (ADR-0005, Task 12). */ + unsigned long fl_stall_ms = (unsigned long)env_long( + "CERACODER_FRAME_STALL_MS", FRAME_LIVENESS_DEFAULT_STALL_MS, 100, 600000); + frame_liveness_init(&frame_liveness, fl_stall_ms, getms()); + // Start pipeline gst_element_set_state((GstElement*)gst_pipeline, GST_STATE_PLAYING); + + /* Startup is complete: pipeline is PLAYING and (when using appsink) the SRT + connection is established. Tell systemd we are READY so a Type=notify unit + leaves the "activating" state. No-op when not run under systemd. */ + sd_notify_ready(); + + /* If systemd configured a watchdog for this unit (WatchdogSec=), pet it from + the main loop at half the configured interval (the interval systemd + recommends). If the main loop hangs/zombies, the ping stops and systemd + kills + respawns the process (ADR-0005). WatchdogSec is intentionally set + larger than ceracoder's in-process SRT reconnect backoff cap so a + legitimate reconnect is never mistaken for a hang. */ + unsigned long long wd_usec = sd_watchdog_usec(); + if (wd_usec > 0) { + guint wd_interval_ms = (guint)((wd_usec / 1000ULL) / 2ULL); + if (wd_interval_ms < 1) wd_interval_ms = 1; + g_timeout_add(wd_interval_ms, watchdog_ping, NULL); + fprintf(stderr, + "systemd watchdog enabled: petting every %u ms (WatchdogSec=%llu s)\n", + wd_interval_ms, wd_usec / 1000000ULL); + } + g_main_loop_run(loop); // Cleanup @@ -610,5 +887,7 @@ int main(int argc, char** argv) { balancer_runner_cleanup(&balancer_runner); pipeline_file_unload(&pfile); - return 0; + // Non-zero on unrecoverable SRT failure so systemd (Restart=on-failure) + // respawns; zero on a clean SIGINT/SIGTERM shutdown (ADR-0005). + return exit_code; } diff --git a/src/gst/frame_liveness.c b/src/gst/frame_liveness.c new file mode 100644 index 0000000..196480f --- /dev/null +++ b/src/gst/frame_liveness.c @@ -0,0 +1,60 @@ +/* + ceracoder - live video encoder with dynamic bitrate control + Copyright (C) 2020 BELABOX project + Copyright (C) 2026 CERALIVE + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#include "frame_liveness.h" + +void frame_liveness_init(FrameLiveness *fl, uint64_t stall_threshold_ms, + uint64_t now_ms) { + atomic_store_explicit(&fl->frame_count, 0, memory_order_relaxed); + /* Seed the "last frame" baseline with startup time so the time-to-first-frame + is bounded by the same threshold as a mid-stream stall (ADR-0005). */ + atomic_store_explicit(&fl->last_frame_ms, now_ms, memory_order_relaxed); + fl->stall_threshold_ms = + (stall_threshold_ms != 0) ? stall_threshold_ms + : FRAME_LIVENESS_DEFAULT_STALL_MS; +} + +void frame_liveness_record(FrameLiveness *fl, uint64_t now_ms) { + /* Hot path: one produced frame. Keep it to a relaxed counter bump + a + relaxed timestamp store — no lock, no allocation, no syscall. */ + atomic_fetch_add_explicit(&fl->frame_count, 1, memory_order_relaxed); + atomic_store_explicit(&fl->last_frame_ms, now_ms, memory_order_relaxed); +} + +bool frame_liveness_advancing_at(const FrameLiveness *fl, uint64_t now_ms) { + uint64_t last = atomic_load_explicit(&fl->last_frame_ms, memory_order_relaxed); + /* Defend against a non-monotonic / racing now_ms that predates the last + recorded frame: that is not evidence of a stall, so report advancing. */ + if (now_ms <= last) { + return true; + } + return (now_ms - last) < fl->stall_threshold_ms; +} + +uint64_t frame_liveness_count(const FrameLiveness *fl) { + return atomic_load_explicit(&fl->frame_count, memory_order_relaxed); +} + +uint64_t frame_liveness_last_frame_ms(const FrameLiveness *fl) { + return atomic_load_explicit(&fl->last_frame_ms, memory_order_relaxed); +} + +uint64_t frame_liveness_threshold_ms(const FrameLiveness *fl) { + return fl->stall_threshold_ms; +} diff --git a/src/gst/frame_liveness.h b/src/gst/frame_liveness.h new file mode 100644 index 0000000..83263bd --- /dev/null +++ b/src/gst/frame_liveness.h @@ -0,0 +1,112 @@ +/* + ceracoder - live video encoder with dynamic bitrate control + Copyright (C) 2020 BELABOX project + Copyright (C) 2026 CERALIVE + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#ifndef CERACODER_FRAME_LIVENESS_H +#define CERACODER_FRAME_LIVENESS_H + +#include +#include +#include + +/* + * Frame-production liveness tracker (ADR-0005 device supervision model). + * + * This module is the PURE, testable core of ceracoder's "zombie-encode" + * detection. Per ADR-0005 the device health-check is process-alive + frame + * production + bond-up; this owns the *frame production* signal. The encoder + * pipeline can wedge in a state where the process is alive and the SRT link is + * up, yet no encoded frames are produced (capture stall, encoder hang). Process + * liveness alone cannot see this — only "are buffers still advancing through + * appsink" can. + * + * The tracker stores two things, updated cheaply once per produced frame + * (appsink buffer) from the GStreamer streaming thread: + * - a monotonic-clock timestamp of the most recent frame, and + * - a cumulative frame counter. + * A reader on the main loop thread (the systemd watchdog ping) asks + * frame_liveness_advancing_at(): true while a frame has arrived within the + * stall threshold, false once production has stalled for >= threshold ms. The + * watchdog ping is gated on this, so a zombie-encode stops petting WatchdogSec + * and systemd respawns the process (no independent restart authority). + * + * Cross-thread safety: the producer (record) and the reader (advancing/count) + * run on different threads, so the two shared scalars are C11 relaxed atomics. + * Relaxed ordering is sufficient — we only need a coherent per-field view of a + * timestamp and a counter, not ordering against other memory. On the target + * (aarch64 Jetson Nano) these compile to plain aligned load/store, so the + * per-frame cost is just a store + an increment (no lock, no syscall). + * + * This translation unit has NO dependency on GStreamer, SRT, or a real clock: + * the "now" is injected as a parameter, so the stall math and threshold timing + * are unit-tested with cmocka deterministically (no sleeping). ceracoder.c owns + * the singleton instance, samples CLOCK_MONOTONIC via getms(), and exposes the + * ceracoder_frames_advancing() / ceracoder_frame_count() health surface that + * the watchdog ping (Task 12) and the health RPC (Task 13) consume. + */ + +/* Default stall threshold: no produced frame for this many ms => not advancing + * (zombie-encode). 3s comfortably exceeds a single frame interval at any sane + * framerate (>=1 fps) yet reacts well within a typical WatchdogSec window. */ +#define FRAME_LIVENESS_DEFAULT_STALL_MS 3000u + +typedef struct { + _Atomic uint64_t frame_count; /* cumulative produced frames (health counter) */ + _Atomic uint64_t last_frame_ms; /* monotonic ms of the most recent frame */ + uint64_t stall_threshold_ms; /* write-once at init; read-only thereafter */ +} FrameLiveness; + +/* + * Initialize the tracker. + * stall_threshold_ms: 0 selects FRAME_LIVENESS_DEFAULT_STALL_MS. + * now_ms: current monotonic clock; seeds last_frame_ms so the + * interval from startup to the first frame is itself + * bounded by the threshold (a pipeline that never produces + * a first frame is flagged stalled after the grace + * window, exactly like a mid-stream stall). + */ +void frame_liveness_init(FrameLiveness *fl, uint64_t stall_threshold_ms, + uint64_t now_ms); + +/* + * Record one produced frame at monotonic time now_ms. Called once per appsink + * buffer from the streaming thread. Intentionally minimal: one relaxed atomic + * store + one relaxed atomic increment, no allocation or syscall. + */ +void frame_liveness_record(FrameLiveness *fl, uint64_t now_ms); + +/* + * True iff a frame has been produced within the last stall_threshold_ms as of + * now_ms (strictly: now_ms - last_frame_ms < threshold). At exactly the + * threshold the stream is considered stalled (returns false). A non-monotonic + * now_ms earlier than last_frame_ms is treated as "fresh" (advancing) rather + * than reporting a spurious stall. + */ +bool frame_liveness_advancing_at(const FrameLiveness *fl, uint64_t now_ms); + +/* Cumulative count of produced frames since init. */ +uint64_t frame_liveness_count(const FrameLiveness *fl); + +/* Monotonic ms of the most recently produced frame (or the init baseline if no + * frame has been produced yet). */ +uint64_t frame_liveness_last_frame_ms(const FrameLiveness *fl); + +/* The configured stall threshold in ms (resolved default included). */ +uint64_t frame_liveness_threshold_ms(const FrameLiveness *fl); + +#endif /* CERACODER_FRAME_LIVENESS_H */ diff --git a/src/io/sd_notify.c b/src/io/sd_notify.c new file mode 100644 index 0000000..897e5fd --- /dev/null +++ b/src/io/sd_notify.c @@ -0,0 +1,96 @@ +/* + ceracoder - live video encoder with dynamic bitrate control + + Copyright (C) 2026 CERALIVE + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. +*/ + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include "sd_notify.h" + +#include +#include +#include +#include +#include +#include +#include + +/* + Send a single sd_notify state string to $NOTIFY_SOCKET. + + The protocol (sd_notify(3)): connect-less AF_UNIX SOCK_DGRAM sendto() of a + newline-separated set of VAR=value assignments. NOTIFY_SOCKET is either a + filesystem path ("/run/...") or an abstract socket ("@...", first byte NUL). +*/ +static int sd_notify_send(const char *state) { + const char *path = getenv("NOTIFY_SOCKET"); + if (path == NULL || (path[0] != '/' && path[0] != '@')) { + /* Not supervised by systemd notify: harmless no-op. */ + return 0; + } + + size_t path_len = strlen(path); + struct sockaddr_un sa; + if (path_len >= sizeof(sa.sun_path)) { + return -E2BIG; + } + + int fd = socket(AF_UNIX, SOCK_DGRAM | SOCK_CLOEXEC, 0); + if (fd < 0) { + return -errno; + } + + memset(&sa, 0, sizeof(sa)); + sa.sun_family = AF_UNIX; + memcpy(sa.sun_path, path, path_len + 1); + + socklen_t sa_len; + if (sa.sun_path[0] == '@') { + /* Abstract namespace: leading '@' becomes a NUL and is NOT counted with a + trailing terminator. */ + sa.sun_path[0] = '\0'; + sa_len = (socklen_t)(offsetof(struct sockaddr_un, sun_path) + path_len); + } else { + sa_len = (socklen_t)(offsetof(struct sockaddr_un, sun_path) + path_len + 1); + } + + ssize_t n; + do { + n = sendto(fd, state, strlen(state), MSG_NOSIGNAL, (struct sockaddr *)&sa, + sa_len); + } while (n < 0 && errno == EINTR); + + int ret = (n < 0) ? -errno : 1; + close(fd); + return ret; +} + +int sd_notify_ready(void) { return sd_notify_send("READY=1\n"); } + +int sd_notify_watchdog(void) { return sd_notify_send("WATCHDOG=1\n"); } + +unsigned long long sd_watchdog_usec(void) { + const char *e = getenv("WATCHDOG_USEC"); + if (e == NULL || *e == '\0') { + return 0ULL; + } + + int saved_errno = errno; + errno = 0; + char *end = NULL; + unsigned long long v = strtoull(e, &end, 10); + if (errno != 0 || end == e) { + errno = saved_errno; + return 0ULL; + } + errno = saved_errno; + return v; +} diff --git a/src/io/sd_notify.h b/src/io/sd_notify.h new file mode 100644 index 0000000..6697ac8 --- /dev/null +++ b/src/io/sd_notify.h @@ -0,0 +1,45 @@ +/* + ceracoder - live video encoder with dynamic bitrate control + + Copyright (C) 2026 CERALIVE + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. +*/ + +#ifndef CERACODER_SD_NOTIFY_H +#define CERACODER_SD_NOTIFY_H + +/* + Minimal, dependency-free sd_notify(3) implementation. + + Talks directly to the AF_UNIX datagram socket named by $NOTIFY_SOCKET, which + systemd sets for units running with Type=notify and/or WatchdogSec=. This + avoids linking libsystemd, keeping the device build (Makefile, no extra + pkg-config dep) unchanged. + + All functions are safe no-ops when the process is NOT supervised by systemd + (NOTIFY_SOCKET unset), so ceracoder still runs identically when launched + directly (e.g. from a shell or, today, child-spawned by the CeraUI backend). + + See ADR-0005 (device supervision & restart-authority model): systemd owns + process restart; ceracoder pets WatchdogSec from its main loop so a hung or + zombie process is killed and respawned. +*/ + +/* Send "READY=1": tells systemd that startup has completed. Required for + Type=notify units. Returns >0 on success, 0 if not under systemd, <0 on + error. */ +int sd_notify_ready(void); + +/* Send "WATCHDOG=1": keep-alive ping that resets the systemd watchdog timer. + Same return convention as sd_notify_ready(). */ +int sd_notify_watchdog(void); + +/* Return the configured watchdog interval in microseconds (from + $WATCHDOG_USEC), or 0 if the watchdog is not enabled for this unit. */ +unsigned long long sd_watchdog_usec(void); + +#endif /* CERACODER_SD_NOTIFY_H */ diff --git a/src/net/srt_reconnect.c b/src/net/srt_reconnect.c new file mode 100644 index 0000000..8b24b44 --- /dev/null +++ b/src/net/srt_reconnect.c @@ -0,0 +1,142 @@ +/* + ceracoder - live video encoder with dynamic bitrate control + Copyright (C) 2020 BELABOX project + Copyright (C) 2026 CERALIVE + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#include "srt_reconnect.h" + +#include +#include + +unsigned int reconnect_backoff_for_attempt(unsigned int base_ms, + unsigned int max_ms, + int attempt) { + if (base_ms == 0u) base_ms = RECONNECT_DEFAULT_BASE_MS; + if (max_ms == 0u) max_ms = RECONNECT_DEFAULT_MAX_MS; + if (max_ms < base_ms) max_ms = base_ms; + + /* attempt is 1-indexed; attempt <= 1 yields the base backoff. */ + if (attempt <= 1) { + return (base_ms < max_ms) ? base_ms : max_ms; + } + + /* Overflow-safe doubling: stop as soon as we reach/exceed the cap. */ + unsigned long backoff = base_ms; + for (int i = 1; i < attempt; i++) { + backoff <<= 1; + if (backoff >= max_ms) { + return max_ms; + } + } + return (unsigned int)backoff; +} + +void reconnect_init(ReconnectController *rc, unsigned int base_ms, + unsigned int max_ms, int max_attempts) { + rc->state = RECONNECT_STATE_CONNECTED; + rc->attempt = 0; + rc->total_reconnects = 0; + rc->max_attempts = max_attempts; + rc->base_backoff_ms = (base_ms != 0u) ? base_ms : RECONNECT_DEFAULT_BASE_MS; + rc->max_backoff_ms = (max_ms != 0u) ? max_ms : RECONNECT_DEFAULT_MAX_MS; + if (rc->max_backoff_ms < rc->base_backoff_ms) { + rc->max_backoff_ms = rc->base_backoff_ms; + } + rc->last_backoff_ms = 0; +} + +void reconnect_begin(ReconnectController *rc) { + rc->state = RECONNECT_STATE_RECONNECTING; + rc->attempt = 0; + rc->last_backoff_ms = 0; +} + +long reconnect_next_backoff_ms(ReconnectController *rc) { + if (rc->state == RECONNECT_STATE_FAILED) { + return -1; + } + + /* Bounded window: once every allowed attempt has been consumed, the window + * is exhausted -> escalate to systemd via a clean non-zero exit. A + * non-positive max_attempts means "unlimited" and never exhausts. */ + if (rc->max_attempts > 0 && rc->attempt >= rc->max_attempts) { + rc->state = RECONNECT_STATE_FAILED; + return -1; + } + + rc->attempt += 1; + rc->state = RECONNECT_STATE_RECONNECTING; + rc->last_backoff_ms = reconnect_backoff_for_attempt( + rc->base_backoff_ms, rc->max_backoff_ms, rc->attempt); + return (long)rc->last_backoff_ms; +} + +void reconnect_succeeded(ReconnectController *rc) { + rc->state = RECONNECT_STATE_CONNECTED; + rc->attempt = 0; + rc->last_backoff_ms = 0; + rc->total_reconnects += 1; +} + +int reconnect_is_reconnecting(const ReconnectController *rc) { + return rc->state == RECONNECT_STATE_RECONNECTING; +} + +int reconnect_attempt_count(const ReconnectController *rc) { + return rc->attempt; +} + +int reconnect_total_reconnects(const ReconnectController *rc) { + return rc->total_reconnects; +} + +ReconnectState reconnect_get_state(const ReconnectController *rc) { + return rc->state; +} + +int reconnect_reason_is_permanent(int reason) { + switch (reason) { + /* --- Internal SRT_REJECT_REASON codes that will not self-heal --- */ + case SRT_REJ_ROGUE: /* malformed handshake: misconfig, not transient */ + case SRT_REJ_VERSION: /* peer below minimum version */ + case SRT_REJ_RDVCOOKIE: /* rendezvous cookie collision */ + case SRT_REJ_BADSECRET: /* wrong passphrase */ + case SRT_REJ_UNSECURE: /* passphrase required/unexpected */ + case SRT_REJ_MESSAGEAPI: /* stream/message API mismatch */ + case SRT_REJ_CONGESTION: /* incompatible congestion controller */ + case SRT_REJ_FILTER: /* incompatible packet filter */ + case SRT_REJ_GROUP: /* incompatible group */ +#ifdef ENABLE_AEAD_API_PREVIEW + case SRT_REJ_CRYPTO: /* conflicting crypto configuration */ +#endif + /* --- Predefined access-control rejects (server said no on purpose) --- */ + case SRT_REJX_BAD_REQUEST: /* 1400: malformed streamid */ + case SRT_REJX_UNAUTHORIZED: /* 1401: auth failed */ + case SRT_REJX_FORBIDDEN: /* 1403: access denied (invalid streamid) */ + case SRT_REJX_NOTFOUND: /* 1404: resource not found */ + case SRT_REJX_BAD_MODE: /* 1405: unsupported mode */ + case SRT_REJX_UNACCEPTABLE: /* 1406: unsatisfiable parameters */ + case SRT_REJX_CONFLICT: /* 1409: streamid already in use */ + return 1; + + /* Everything else (timeout, peer-busy, resource, backlog, system, close, + * DNS/socket/sockopt local-setup errors) is treated as transient and is + * retried within the bounded window. */ + default: + return 0; + } +} diff --git a/src/net/srt_reconnect.h b/src/net/srt_reconnect.h new file mode 100644 index 0000000..44e1493 --- /dev/null +++ b/src/net/srt_reconnect.h @@ -0,0 +1,132 @@ +/* + ceracoder - live video encoder with dynamic bitrate control + Copyright (C) 2020 BELABOX project + Copyright (C) 2026 CERALIVE + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +#ifndef SRT_RECONNECT_H +#define SRT_RECONNECT_H + +/* + * SRT reconnect state machine (ADR-0005: device supervision model). + * + * This module is the PURE, testable core of ceracoder's in-process SRT + * reconnect policy. It owns the "transient SRT loss" failure class: instead of + * hard-exiting on the first send failure or ACK timeout, ceracoder drives this + * state machine to retry the SRT connection in-process with exponential + * backoff. Only when the bounded reconnect window is exhausted (or a permanent + * failure is classified) does the process exit non-zero, handing restart + * authority to systemd (Restart=on-failure). + * + * This translation unit has NO dependency on GStreamer or live SRT sockets, so + * the backoff math and state transitions are unit-tested with cmocka without + * any network. ceracoder.c wires the live socket reconnect around it. + * + * Status accessors (reconnect_is_reconnecting / reconnect_total_reconnects) + * are the "simple flag/counter" health surface consumed by the frame-liveness + * (Task 12) and health-RPC (Task 13) work. + */ + +/* Default backoff schedule. Backoff starts at BASE and doubles each attempt, + * capped at MAX: 1s -> 2s -> 4s -> 8s -> 16s -> 30s -> 30s ... */ +#define RECONNECT_DEFAULT_BASE_MS 1000u /* first backoff (1s) */ +#define RECONNECT_DEFAULT_MAX_MS 30000u /* backoff cap (30s) */ + +/* + * Default bounded reconnect window (max in-process attempts before escalating + * to systemd). Per ADR-0005 transient loss is absorbed in-process but escalates + * on window exhaustion, so the shipped default is FINITE. With the default + * schedule, 10 attempts spans ~1+2+4+8+16+30*5 = ~181s before clean exit. + * + * The mechanism is fully configurable via reconnect_init(): pass + * RECONNECT_UNLIMITED to retry transient loss forever (no escalation), or a + * finite count for a bounded window. Permanent rejects (auth/forbidden/conflict) + * bypass the window entirely and exit immediately (see + * reconnect_reason_is_permanent). + */ +#define RECONNECT_DEFAULT_MAX_ATTEMPTS 10 +#define RECONNECT_UNLIMITED 0 /* max_attempts <= 0 => unlimited */ + +typedef enum { + RECONNECT_STATE_CONNECTED = 0, /* healthy, streaming */ + RECONNECT_STATE_RECONNECTING, /* a drop occurred, retrying in-process */ + RECONNECT_STATE_FAILED /* unrecoverable: window exhausted -> exit */ +} ReconnectState; + +typedef struct { + ReconnectState state; + int attempt; /* attempts used in the CURRENT reconnect episode */ + int total_reconnects; /* cumulative SUCCESSFUL reconnects (health counter) */ + int max_attempts; /* <= 0 => unlimited; > 0 => bounded window */ + unsigned int base_backoff_ms; + unsigned int max_backoff_ms; + unsigned int last_backoff_ms; /* backoff returned for the most recent attempt */ +} ReconnectController; + +/* + * Compute the backoff (ms) for a 1-indexed attempt number: an overflow-safe + * min(base * 2^(attempt-1), max). Exposed for testing the schedule directly. + */ +unsigned int reconnect_backoff_for_attempt(unsigned int base_ms, + unsigned int max_ms, + int attempt); + +/* + * Initialize the controller in the CONNECTED state. + * base_ms / max_ms: pass 0 to use RECONNECT_DEFAULT_BASE_MS / _MAX_MS. + * max_attempts: <= 0 (RECONNECT_UNLIMITED) for unlimited transient retry, + * > 0 for a bounded window that escalates to systemd. + */ +void reconnect_init(ReconnectController *rc, unsigned int base_ms, + unsigned int max_ms, int max_attempts); + +/* + * Begin a new reconnect episode (CONNECTED/FAILED -> RECONNECTING). Resets the + * per-episode attempt counter; preserves the cumulative total_reconnects. + */ +void reconnect_begin(ReconnectController *rc); + +/* + * Advance the state machine one attempt and return how long (ms) to wait before + * that attempt. Returns -1 when the bounded window is exhausted, transitioning + * to RECONNECT_STATE_FAILED (caller must clean-exit for systemd). + */ +long reconnect_next_backoff_ms(ReconnectController *rc); + +/* + * Record a successful reconnect: RECONNECTING -> CONNECTED, reset the episode + * attempt counter, and increment the cumulative reconnect total. + */ +void reconnect_succeeded(ReconnectController *rc); + +/* Health-surface accessors (Task 12 / Task 13). */ +int reconnect_is_reconnecting(const ReconnectController *rc); +int reconnect_attempt_count(const ReconnectController *rc); +int reconnect_total_reconnects(const ReconnectController *rc); +ReconnectState reconnect_get_state(const ReconnectController *rc); + +/* + * Classify an SRT failure code as permanent (no point retrying -> exit now) vs + * transient (retry within the bounded window). `reason` is either an + * SRT_REJECT_REASON / SRT_REJX_* reject code from srt_getrejectreason(), or one + * of srt_client_connect()'s negative local-setup codes. + * + * Returns non-zero for permanent failures (auth/forbidden/conflict/version + * mismatch and similar that will not self-heal), zero otherwise. + */ +int reconnect_reason_is_permanent(int reason); + +#endif /* SRT_RECONNECT_H */ diff --git a/tests/test_frame_liveness.c b/tests/test_frame_liveness.c new file mode 100644 index 0000000..744b932 --- /dev/null +++ b/tests/test_frame_liveness.c @@ -0,0 +1,223 @@ +/* + ceracoder - live video encoder with dynamic bitrate control + Copyright (C) 2020 BELABOX project + Copyright (C) 2026 CERALIVE + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +/* + * Unit tests for the frame-production liveness tracker (ADR-0005, Task 12). + * + * These exercise the PURE zombie-encode detector with an injected monotonic + * clock and no GStreamer/SRT: a healthy stream reports advancing frames, a + * stalled pipeline (process alive, frames stopped) is flagged not-advancing + * once the stall threshold elapses, and the threshold boundary is exact. Time + * is a parameter, so the N-ms timing is verified deterministically without + * sleeping. + */ + +#include +#include +#include +#include + +#include "frame_liveness.h" + +/* + * Test: a healthy stream reports advancing frames. + * + * Frames recorded at the nominal interval keep advancing_at() true across the + * whole run; the cumulative counter tracks every produced frame. + */ +static void test_healthy_stream_advancing(void **state) { + (void)state; + + FrameLiveness fl; + frame_liveness_init(&fl, FRAME_LIVENESS_DEFAULT_STALL_MS, 0); + + /* 30 fps => ~33 ms/frame, well inside the 3000 ms threshold. */ + uint64_t now = 0; + for (int i = 0; i < 300; i++) { + now += 33; + frame_liveness_record(&fl, now); + assert_true(frame_liveness_advancing_at(&fl, now)); + /* Still advancing a short time after the most recent frame. */ + assert_true(frame_liveness_advancing_at(&fl, now + 100)); + } + + assert_int_equal((int)frame_liveness_count(&fl), 300); + assert_int_equal((int)frame_liveness_last_frame_ms(&fl), (int)now); +} + +/* + * Test: a stalled pipeline is flagged not-advancing after N ms. + * + * The zombie-encode case: frames flow, then stop while the process stays alive. + * advancing_at() stays true through the threshold window, then flips false once + * the gap reaches the threshold — that is the signal the watchdog ping is gated + * on. + */ +static void test_stalled_pipeline_flagged(void **state) { + (void)state; + + const uint64_t threshold = FRAME_LIVENESS_DEFAULT_STALL_MS; /* 3000 */ + FrameLiveness fl; + frame_liveness_init(&fl, threshold, 0); + + /* Last frame produced at t=5000, then the encoder wedges. */ + uint64_t last = 5000; + frame_liveness_record(&fl, last); + assert_true(frame_liveness_advancing_at(&fl, last)); + + /* Within the window: still considered healthy. */ + assert_true(frame_liveness_advancing_at(&fl, last + 1)); + assert_true(frame_liveness_advancing_at(&fl, last + (threshold - 1))); + + /* At/after the threshold: stalled. */ + assert_false(frame_liveness_advancing_at(&fl, last + threshold)); + assert_false(frame_liveness_advancing_at(&fl, last + threshold + 1)); + assert_false(frame_liveness_advancing_at(&fl, last + 10 * threshold)); + + /* A fresh frame after the stall clears it again (recovery is observable). */ + frame_liveness_record(&fl, last + 10 * threshold); + assert_true(frame_liveness_advancing_at(&fl, last + 10 * threshold)); +} + +/* + * Test: the N-ms threshold boundary is exact. + * + * gap < threshold => advancing; gap >= threshold => stalled. Verified at the + * default and a custom threshold so the timing is not hard-coded to one value. + */ +static void test_threshold_timing_boundary(void **state) { + (void)state; + + const uint64_t thresholds[] = {FRAME_LIVENESS_DEFAULT_STALL_MS, 1000, 250, 500}; + for (size_t t = 0; t < sizeof(thresholds) / sizeof(thresholds[0]); t++) { + uint64_t th = thresholds[t]; + FrameLiveness fl; + frame_liveness_init(&fl, th, 0); + assert_int_equal((int)frame_liveness_threshold_ms(&fl), (int)th); + + uint64_t last = 1000; + frame_liveness_record(&fl, last); + + /* Largest gap that is still healthy. */ + assert_true(frame_liveness_advancing_at(&fl, last + (th - 1))); + /* Exactly the threshold is already a stall. */ + assert_false(frame_liveness_advancing_at(&fl, last + th)); + } +} + +/* + * Test: a zero threshold resolves to the shipped default. + * + * Mirrors the init convention used elsewhere (pass 0 => default), so callers + * that do not override the env knob get FRAME_LIVENESS_DEFAULT_STALL_MS. + */ +static void test_zero_threshold_uses_default(void **state) { + (void)state; + + FrameLiveness fl; + frame_liveness_init(&fl, 0, 0); + assert_int_equal((int)frame_liveness_threshold_ms(&fl), + (int)FRAME_LIVENESS_DEFAULT_STALL_MS); +} + +/* + * Test: the startup grace bounds time-to-first-frame. + * + * Before any frame is produced the init baseline acts as the "last activity" + * time, so a pipeline that never emits a first frame is flagged stalled after + * the same threshold as a mid-stream stall (no special-casing required). + */ +static void test_startup_grace_then_stall(void **state) { + (void)state; + + const uint64_t threshold = FRAME_LIVENESS_DEFAULT_STALL_MS; + FrameLiveness fl; + uint64_t start = 10000; + frame_liveness_init(&fl, threshold, start); + + assert_int_equal((int)frame_liveness_count(&fl), 0); + + /* During the grace window (no frame yet) we still report advancing so the + watchdog keeps being pet through normal startup. */ + assert_true(frame_liveness_advancing_at(&fl, start)); + assert_true(frame_liveness_advancing_at(&fl, start + (threshold - 1))); + + /* First frame never arrives: stalled once the grace window elapses. */ + assert_false(frame_liveness_advancing_at(&fl, start + threshold)); + + /* A first frame inside the grace window clears it and counts. */ + frame_liveness_record(&fl, start + 500); + assert_int_equal((int)frame_liveness_count(&fl), 1); + assert_true(frame_liveness_advancing_at(&fl, start + 500 + (threshold - 1))); +} + +/* + * Test: a non-monotonic / racing reader clock never reports a spurious stall. + * + * The producer (streaming thread) and reader (main loop) are unsynchronized; a + * now_ms sampled at or before the last recorded frame must be treated as fresh + * rather than as a giant backwards gap. + */ +static void test_non_monotonic_now_is_fresh(void **state) { + (void)state; + + FrameLiveness fl; + frame_liveness_init(&fl, FRAME_LIVENESS_DEFAULT_STALL_MS, 0); + + frame_liveness_record(&fl, 100000); + assert_true(frame_liveness_advancing_at(&fl, 100000)); /* equal */ + assert_true(frame_liveness_advancing_at(&fl, 99999)); /* earlier */ + assert_true(frame_liveness_advancing_at(&fl, 0)); /* far earlier */ +} + +/* + * Test: the frame counter is monotonic and matches the number of records. + */ +static void test_counter_monotonic(void **state) { + (void)state; + + FrameLiveness fl; + frame_liveness_init(&fl, FRAME_LIVENESS_DEFAULT_STALL_MS, 0); + assert_int_equal((int)frame_liveness_count(&fl), 0); + + uint64_t now = 0; + uint64_t prev = 0; + for (int i = 1; i <= 1000; i++) { + now += 16; + frame_liveness_record(&fl, now); + uint64_t c = frame_liveness_count(&fl); + assert_int_equal((int)c, i); + assert_true(c > prev); + prev = c; + } +} + +int main(void) { + const struct CMUnitTest tests[] = { + cmocka_unit_test(test_healthy_stream_advancing), + cmocka_unit_test(test_stalled_pipeline_flagged), + cmocka_unit_test(test_threshold_timing_boundary), + cmocka_unit_test(test_zero_threshold_uses_default), + cmocka_unit_test(test_startup_grace_then_stall), + cmocka_unit_test(test_non_monotonic_now_is_fresh), + cmocka_unit_test(test_counter_monotonic), + }; + + return cmocka_run_group_tests(tests, NULL, NULL); +} diff --git a/tests/test_reconnect.c b/tests/test_reconnect.c new file mode 100644 index 0000000..73e4d3b --- /dev/null +++ b/tests/test_reconnect.c @@ -0,0 +1,259 @@ +/* + ceracoder - live video encoder with dynamic bitrate control + Copyright (C) 2020 BELABOX project + Copyright (C) 2026 CERALIVE + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . +*/ + +/* + * Unit tests for the SRT reconnect state machine (ADR-0005). + * + * These exercise the PURE reconnect policy with no live SRT socket or + * GStreamer pipeline: exponential backoff schedule, transient -> reconnect -> + * success, permanent failure / window exhaustion -> clean-exit signal, and the + * status flag/counter health surface. + */ + +#include +#include +#include +#include + +#include +#include + +#include "srt_reconnect.h" + +/* + * Test: exponential backoff progression (1s -> 2s -> 4s -> ... capped at 30s). + * + * Verifies the standalone schedule helper doubles each attempt and saturates at + * the configured cap without overflowing. + */ +static void test_backoff_exponential_progression(void **state) { + (void)state; + + const unsigned int base = RECONNECT_DEFAULT_BASE_MS; /* 1000 */ + const unsigned int cap = RECONNECT_DEFAULT_MAX_MS; /* 30000 */ + + assert_int_equal(reconnect_backoff_for_attempt(base, cap, 1), 1000); + assert_int_equal(reconnect_backoff_for_attempt(base, cap, 2), 2000); + assert_int_equal(reconnect_backoff_for_attempt(base, cap, 3), 4000); + assert_int_equal(reconnect_backoff_for_attempt(base, cap, 4), 8000); + assert_int_equal(reconnect_backoff_for_attempt(base, cap, 5), 16000); + /* 32000 would exceed the cap -> saturates at 30000 */ + assert_int_equal(reconnect_backoff_for_attempt(base, cap, 6), 30000); + assert_int_equal(reconnect_backoff_for_attempt(base, cap, 7), 30000); + /* A large attempt count must stay clamped, never overflow */ + assert_int_equal(reconnect_backoff_for_attempt(base, cap, 1000), 30000); +} + +/* + * Test: the live controller hands out the same exponential schedule and the + * per-episode attempt counter advances monotonically. + */ +static void test_controller_backoff_sequence(void **state) { + (void)state; + + ReconnectController rc; + reconnect_init(&rc, 0, 0, RECONNECT_UNLIMITED); /* defaults, unlimited */ + reconnect_begin(&rc); + + long expected[] = {1000, 2000, 4000, 8000, 16000, 30000, 30000}; + for (size_t i = 0; i < sizeof(expected) / sizeof(expected[0]); i++) { + long backoff = reconnect_next_backoff_ms(&rc); + assert_int_equal(backoff, expected[i]); + assert_int_equal(reconnect_attempt_count(&rc), (int)(i + 1)); + assert_true(reconnect_is_reconnecting(&rc)); + } +} + +/* + * Test: transient failure -> reconnect -> success. + * + * A drop starts an episode; a couple of attempts back off, then a successful + * reconnect returns to CONNECTED, resets the episode, and bumps the cumulative + * reconnect counter (the health surface consumed by Tasks 12/13). + */ +static void test_transient_failure_then_success(void **state) { + (void)state; + + ReconnectController rc; + reconnect_init(&rc, 0, 0, RECONNECT_UNLIMITED); + assert_int_equal(reconnect_get_state(&rc), RECONNECT_STATE_CONNECTED); + assert_false(reconnect_is_reconnecting(&rc)); + assert_int_equal(reconnect_total_reconnects(&rc), 0); + + /* Transient drop -> begin reconnecting */ + reconnect_begin(&rc); + assert_true(reconnect_is_reconnecting(&rc)); + + /* First attempt backs off 1s, simulate connect failure (keep looping) */ + assert_int_equal(reconnect_next_backoff_ms(&rc), 1000); + /* Second attempt backs off 2s, simulate connect SUCCESS */ + assert_int_equal(reconnect_next_backoff_ms(&rc), 2000); + reconnect_succeeded(&rc); + + /* Back to healthy, episode reset, total incremented */ + assert_int_equal(reconnect_get_state(&rc), RECONNECT_STATE_CONNECTED); + assert_false(reconnect_is_reconnecting(&rc)); + assert_int_equal(reconnect_attempt_count(&rc), 0); + assert_int_equal(reconnect_total_reconnects(&rc), 1); + + /* A later, independent drop restarts the backoff from the base and the + * cumulative reconnect counter keeps climbing across episodes. */ + reconnect_begin(&rc); + assert_int_equal(reconnect_next_backoff_ms(&rc), 1000); + reconnect_succeeded(&rc); + assert_int_equal(reconnect_total_reconnects(&rc), 2); +} + +/* + * Test: permanent failure / window exhaustion -> clean exit. + * + * A bounded window of N attempts hands out exactly N backoffs, then returns -1 + * and transitions to FAILED. ceracoder maps that to a clean non-zero exit so + * systemd (Restart=on-failure) owns the respawn (ADR-0005). + */ +static void test_window_exhaustion_signals_clean_exit(void **state) { + (void)state; + + const int max_attempts = 3; + ReconnectController rc; + reconnect_init(&rc, 0, 0, max_attempts); + reconnect_begin(&rc); + + /* Exactly max_attempts non-negative backoffs (each a failed connect) */ + for (int i = 0; i < max_attempts; i++) { + long backoff = reconnect_next_backoff_ms(&rc); + assert_true(backoff >= 0); + assert_false(reconnect_get_state(&rc) == RECONNECT_STATE_FAILED); + } + + /* Window exhausted -> -1 and FAILED (caller must clean-exit) */ + assert_int_equal(reconnect_next_backoff_ms(&rc), -1); + assert_int_equal(reconnect_get_state(&rc), RECONNECT_STATE_FAILED); + assert_false(reconnect_is_reconnecting(&rc)); + + /* Once FAILED, the machine stays terminal */ + assert_int_equal(reconnect_next_backoff_ms(&rc), -1); + assert_int_equal(reconnect_get_state(&rc), RECONNECT_STATE_FAILED); +} + +/* + * Test: an unlimited (transient) policy never exhausts its window, so a + * persistently flaky link keeps being absorbed in-process rather than exiting. + */ +static void test_unlimited_never_exhausts(void **state) { + (void)state; + + ReconnectController rc; + reconnect_init(&rc, 0, 0, RECONNECT_UNLIMITED); + reconnect_begin(&rc); + + for (int i = 0; i < 1000; i++) { + long backoff = reconnect_next_backoff_ms(&rc); + assert_true(backoff >= 0); + assert_true(backoff <= (long)RECONNECT_DEFAULT_MAX_MS); + } + assert_false(reconnect_get_state(&rc) == RECONNECT_STATE_FAILED); + assert_true(reconnect_is_reconnecting(&rc)); +} + +/* + * Test: a successful reconnect inside a bounded window resets the attempt + * budget, so a subsequent drop gets the full window again (the window bounds a + * single uninterrupted reconnect episode, not the process lifetime). + */ +static void test_success_resets_bounded_window(void **state) { + (void)state; + + ReconnectController rc; + reconnect_init(&rc, 0, 0, 2); /* tiny window: 2 attempts */ + reconnect_begin(&rc); + + /* Burn one attempt, then succeed */ + assert_int_equal(reconnect_next_backoff_ms(&rc), 1000); + reconnect_succeeded(&rc); + assert_int_equal(reconnect_attempt_count(&rc), 0); + + /* New episode still gets the full 2-attempt window */ + reconnect_begin(&rc); + assert_true(reconnect_next_backoff_ms(&rc) >= 0); /* attempt 1 */ + assert_true(reconnect_next_backoff_ms(&rc) >= 0); /* attempt 2 */ + assert_int_equal(reconnect_next_backoff_ms(&rc), -1); /* exhausted */ + assert_int_equal(reconnect_get_state(&rc), RECONNECT_STATE_FAILED); +} + +/* + * Test: failure-reason classification. + * + * Permanent rejects (auth/forbidden/conflict/version/...) must exit immediately; + * transient ones (timeout/peer/resource and local setup errors) keep retrying. + */ +static void test_reason_classification(void **state) { + (void)state; + + /* Permanent: server/peer rejected on purpose, will not self-heal */ + assert_true(reconnect_reason_is_permanent(SRT_REJX_FORBIDDEN)); + assert_true(reconnect_reason_is_permanent(SRT_REJX_CONFLICT)); + assert_true(reconnect_reason_is_permanent(SRT_REJX_UNAUTHORIZED)); + assert_true(reconnect_reason_is_permanent(SRT_REJX_BAD_REQUEST)); + assert_true(reconnect_reason_is_permanent(SRT_REJ_VERSION)); + assert_true(reconnect_reason_is_permanent(SRT_REJ_BADSECRET)); + + /* Transient: keep retrying within the bounded window */ + assert_false(reconnect_reason_is_permanent(SRT_REJ_TIMEOUT)); + assert_false(reconnect_reason_is_permanent(SRT_REJ_PEER)); + assert_false(reconnect_reason_is_permanent(SRT_REJ_RESOURCE)); + assert_false(reconnect_reason_is_permanent(SRT_REJ_SYSTEM)); + assert_false(reconnect_reason_is_permanent(SRT_REJ_UNKNOWN)); + /* srt_client_connect()'s local-setup negative codes are transient */ + assert_false(reconnect_reason_is_permanent(-1)); /* DNS resolve */ + assert_false(reconnect_reason_is_permanent(-2)); /* socket open */ + assert_false(reconnect_reason_is_permanent(-4)); /* sockopt */ +} + +/* + * Test: a custom (non-default) backoff schedule is honored end to end. + */ +static void test_custom_backoff_bounds(void **state) { + (void)state; + + ReconnectController rc; + reconnect_init(&rc, 500, 4000, RECONNECT_UNLIMITED); /* 0.5s base, 4s cap */ + reconnect_begin(&rc); + + assert_int_equal(reconnect_next_backoff_ms(&rc), 500); + assert_int_equal(reconnect_next_backoff_ms(&rc), 1000); + assert_int_equal(reconnect_next_backoff_ms(&rc), 2000); + assert_int_equal(reconnect_next_backoff_ms(&rc), 4000); + assert_int_equal(reconnect_next_backoff_ms(&rc), 4000); /* capped */ +} + +int main(void) { + const struct CMUnitTest tests[] = { + cmocka_unit_test(test_backoff_exponential_progression), + cmocka_unit_test(test_controller_backoff_sequence), + cmocka_unit_test(test_transient_failure_then_success), + cmocka_unit_test(test_window_exhaustion_signals_clean_exit), + cmocka_unit_test(test_unlimited_never_exhausts), + cmocka_unit_test(test_success_resets_bounded_window), + cmocka_unit_test(test_reason_classification), + cmocka_unit_test(test_custom_backoff_bounds), + }; + + return cmocka_run_group_tests(tests, NULL, NULL); +}