SVLS-8857: replace serverless metric agent with base metric agent#52267
SVLS-8857: replace serverless metric agent with base metric agent#52267apiarian-datadog wants to merge 8 commits into
Conversation
|
|
serverless-init-ci run: https://gitlab.ddbuild.io/DataDog/serverless-init-ci/-/pipelines/118912411 for 3a4cb87 which passes sufficiently |
|
this branch should hopefully be reviewable commit by commit, but the commits are also grouped into the first few making some changes to the metrics agent and the last handling the serverless-init changes so we can split this into two prs if necessary. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3a4cb87fb3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if s.flushOnStop { | ||
| s.log.Debug("dogstatsd_flush_on_stop: draining worker batchers before stop") | ||
| for _, w := range s.workers { | ||
| w.flushChan <- struct{}{} | ||
| } |
There was a problem hiding this comment.
Wait for DogStatsD worker flush completion
With dogstatsd_flush_on_stop enabled, this send only waits until a worker receives the flush signal, not until w.batcher.flush() has pushed that worker's samples into the demux. On serverless shutdown, Fx can then enter AgentDemultiplexer.Stop(), enqueue the drain sentinel before a slow worker enqueues its batch, and run the final flush before those samples are incorporated, dropping the final DogStatsD metrics. Please add a completion ack per worker (or otherwise wait for batcher.flush() to finish) before returning from stop().
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
this is functionality that is the same (actually a little bit better) than the existing serverless logic. so for now we'll leave this as it is and consider it for a followup ticket: https://datadoghq.atlassian.net/browse/SVLS-9353
There was a problem hiding this comment.
IIRC there is no guarantee that the flush even arrives before stop signal, never mention complete. With my proposed change in server_worker, it is a simple matter of adding a sync.WaitGroup to the server.
Go Package Import DifferencesBaseline: d3eef56
|
Files inventory check summaryFile checks results against ancestor d3eef567: Results for datadog-agent_7.82.0~devel.git.63.5c486c5.pipeline.118932721-1_amd64.deb:No change detected |
Static quality checks✅ Please find below the results from static quality gates Successful checksInfo
15 successful checks with minimal change (< 2 KiB)
|
Add two config keys (both default false, preserving existing behavior): - forwarder_stop_wait_for_inflight: when set, Worker.Stop waits for in-flight HTTP transactions to complete instead of cancelling them. - dogstatsd_flush_on_stop: when set, the DogStatsD server drains pending worker batchers into the time sampler on stop. These gate the new shutdown-flush behavior used by serverless-init. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
When forwarder_stop_wait_for_inflight is set, Worker.Stop signals the Start goroutine via a dedicated stopChan, lets in-flight HTTP requests complete, and cancels workerCtx last — so a final flush is not aborted by a racing context cancel. The outer bound remains forwarder_stop_timeout. When the flag is unset (default), Stop keeps the original cancel-immediately semantics. Tests are rewritten to drive Stop concurrently and assert the wait/cancel behavior; the testTransaction block primitive moves from shouldBlock to a release channel that models an in-flight request. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add a per-worker flushChan and a flushOnStop gate (from dogstatsd_flush_on_stop). When enabled, stop() sends once to each worker's flushChan before closing stopChan, so every worker drains its batcher into the time sampler exactly once before its run loop is torn down. Long-running agents leave the gate off and are unaffected. The legacy ServerlessFlush mechanism is left in place for now; it is removed once serverless-init stops using it. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add StopTimeout to StartServerlessTraceAgentArgs (defaults to 3s when zero) so callers can bound how long Stop waits for the trace agent's Run loop to exit. serverless-init uses this to keep shutdown within the platform grace window. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Shrink ServerlessMetricAgent to a demultiplexer plus the three tag sets it applies (bundled in a new Tags struct), and add New() to construct it. The agent no longer builds its own demultiplexer/forwarder or DogStatsD server — those are now Fx-wired and injected. Add a metricstest package providing the Fx bundle used by the serverless metric tests, and update the metric/otlp tests to construct the agent on top of it. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Construct the forwarder, demultiplexer and DogStatsD server via Fx and rework shutdown so the OnStop cascade (dsdServer.stop -> demux.Stop -> forwarder.Stop) drains and flushes the final samples instead of dropping them. preloadEarly applies the config overrides the pipeline reads at construction time (forwarder_stop_wait_for_inflight, dogstatsd_flush_on_stop, dogstatsd_flush_incomplete_buckets, stop timeouts), and the explicit ServerlessFlush/last-flush orchestration is replaced by the Fx lifecycle. CloudService.Shutdown now takes a nilable *ServerlessMetricAgent (nil when no API key is configured) and the ShouldForceFlushAllOnForceFlushToSerializer hook is removed. log.CreateConfig takes an explicit flush timeout. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
With serverless-init on the standard Fx pipeline, the legacy serverless-only code paths are dead. Remove them: - pkg/aggregator: ServerlessDemultiplexer (and its test) - comp/forwarder/defaultforwarder: SyncForwarder - comp/dogstatsd/server: NewServerlessServer / ServerlessDogstatsd and the ServerlessFlush interface method + serverlessFlushChan worker path - comp/dogstatsd/serverDebug: NewServerlessServerDebug - pkg/serverless: the FlushableAgent interface DogStatsD shutdown flushing now goes solely through the dogstatsd_flush_on_stop per-worker path; the old externally-driven ServerlessFlush is gone. BUILD.bazel files are updated to drop the removed sources. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
3a4cb87 to
5c486c5
Compare
Regression DetectorRegression Detector ResultsMetrics dashboard Baseline: d3eef56 Optimization Goals: ✅ No significant changes detected
|
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | quality_gate_idle | memory utilization | +0.46 | [+0.41, +0.52] | 1 | Logs bounds checks dashboard |
| ➖ | quality_gate_metrics_logs | memory utilization | -0.12 | [-0.37, +0.13] | 1 | Logs bounds checks dashboard |
| ➖ | quality_gate_idle_all_features | memory utilization | -0.24 | [-0.27, -0.20] | 1 | Logs bounds checks dashboard |
| ➖ | quality_gate_logs | % cpu utilization | -2.93 | [-3.98, -1.88] | 1 | Logs bounds checks dashboard |
Bounds Checks: ❌ Failed
| perf | experiment | bounds_check_name | replicates_passed | observed_value | links |
|---|---|---|---|---|---|
| ✅ | quality_gate_idle | intake_connections | 10/10 | 3 ≤ 4 | bounds checks dashboard |
| ❌ | quality_gate_idle | memory_usage | 9/10 | 147.03MiB > 147MiB | bounds checks dashboard |
| ✅ | quality_gate_idle | total_bytes_received | 10/10 | 579.57KiB ≤ 819.20KiB | bounds checks dashboard |
| ✅ | quality_gate_idle_all_features | intake_connections | 10/10 | 3 ≤ 4 | bounds checks dashboard |
| ✅ | quality_gate_idle_all_features | memory_usage | 10/10 | 485.12MiB ≤ 495MiB | bounds checks dashboard |
| ✅ | quality_gate_idle_all_features | total_bytes_received | 10/10 | 0.89MiB ≤ 1.25MiB | bounds checks dashboard |
| ✅ | quality_gate_logs | intake_connections | 10/10 | 4 ≤ 6 | bounds checks dashboard |
| ✅ | quality_gate_logs | memory_usage | 10/10 | 182.28MiB ≤ 195MiB | bounds checks dashboard |
| ✅ | quality_gate_logs | missed_bytes | 10/10 | 0B = 0B | bounds checks dashboard |
| ✅ | quality_gate_logs | total_bytes_received | 10/10 | 264.21MiB ≤ 292MiB | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | cpu_usage | 10/10 | 345.78 ≤ 2000 | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | intake_connections | 10/10 | 3 ≤ 6 | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | memory_usage | 10/10 | 400.97MiB ≤ 430MiB | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | missed_bytes | 10/10 | 0B = 0B | bounds checks dashboard |
| ✅ | quality_gate_metrics_logs | total_bytes_received | 10/10 | 0.86GiB ≤ 1.04GiB | bounds checks dashboard |
Explanation
Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%
Performance changes are noted in the perf column of each table:
- ✅ = significantly better comparison variant performance
- ❌ = significantly worse comparison variant performance
- ➖ = no significant change in performance
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
Replicate Execution Details
We run multiple replicates for each experiment/variant. However, we allow replicates to be automatically retried if there are any failures, up to 8 times, at which point the replicate is marked dead and we are unable to run analysis for the entire experiment. We call each of these attempts at running replicates a replicate execution. This section lists all replicate executions that failed due to the target crashing or being oom killed.
Note: In the below tables we bucket failures by experiment, variant, and failure type. For each of these buckets we list out the replicate indexes that failed with an annotation signifying how many times said replicate failed with the given failure mode. In the below example the baseline variant of the experiment named experiment_with_failures had two replicates that failed by oom kills. Replicate 0, which failed 8 executions, and replicate 1 which failed 6 executions, all with the same failure mode.
| Experiment | Variant | Replicates | Failure | Logs | Debug Dashboard |
|---|---|---|---|---|---|
| experiment_with_failures | baseline | 0 (x8) 1 (x6) | Oom killed | Debug Dashboard |
The debug dashboard links will take you to a debugging dashboard specifically designed to investigate replicate execution failures.
❌ Retried Profiling Replicate Execution Failures (ddprof)
Note: Profiling replicas may still be executing. See the debug dashboard for up to date status.
| Experiment | Variant | Replicates | Failure | Debug Dashboard |
|---|---|---|---|---|
| quality_gate_idle | baseline | 10 | Oom killed | Debug Dashboard |
| quality_gate_idle | comparison | 10 | Oom killed | Debug Dashboard |
| quality_gate_idle_all_features | baseline | 10 | Oom killed | Debug Dashboard |
| quality_gate_idle_all_features | comparison | 10 | Oom killed | Debug Dashboard |
| quality_gate_logs | baseline | 10 | Oom killed | Debug Dashboard |
| quality_gate_logs | comparison | 10 | Oom killed | Debug Dashboard |
| quality_gate_metrics_logs | baseline | 10 | Oom killed | Debug Dashboard |
| quality_gate_metrics_logs | comparison | 10 | Oom killed | Debug Dashboard |
CI Pass/Fail Decision
❌ Failed. Some Quality Gates were violated.
- quality_gate_logs, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_logs, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_logs, bounds check missed_bytes: 10/10 replicas passed. Gate passed.
- quality_gate_logs, bounds check total_bytes_received: 10/10 replicas passed. Gate passed.
- quality_gate_idle_all_features, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_idle_all_features, bounds check total_bytes_received: 10/10 replicas passed. Gate passed.
- quality_gate_idle_all_features, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_idle, bounds check memory_usage: 9/10 replicas passed. Failed 1 which is > 0. Gate FAILED.
- quality_gate_idle, bounds check total_bytes_received: 10/10 replicas passed. Gate passed.
- quality_gate_idle, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_metrics_logs, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_metrics_logs, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_metrics_logs, bounds check cpu_usage: 10/10 replicas passed. Gate passed.
- quality_gate_metrics_logs, bounds check missed_bytes: 10/10 replicas passed. Gate passed.
- quality_gate_metrics_logs, bounds check total_bytes_received: 10/10 replicas passed. Gate passed.
| @@ -65,7 +71,7 @@ func (w *worker) run() { | |||
| case <-w.server.stopChan: | |||
| return | |||
There was a problem hiding this comment.
| return | |
| if s.config.GetBool("dogstatsd_flush_incomplete_buckets") { | |
| w.batcher.flush() | |
| } | |
| w.server.workerWg.Done() | |
| return |
| if s.flushOnStop { | ||
| s.log.Debug("dogstatsd_flush_on_stop: draining worker batchers before stop") | ||
| for _, w := range s.workers { | ||
| w.flushChan <- struct{}{} | ||
| } |
There was a problem hiding this comment.
IIRC there is no guarantee that the flush even arrives before stop signal, never mention complete. With my proposed change in server_worker, it is a simple matter of adding a sync.WaitGroup to the server.
| pidMap: pidMap, | ||
| cachedOriginCounters: make(map[string]cachedOriginCounter), | ||
| ServerlessMode: serverless, | ||
| flushOnStop: cfg.GetBool("dogstatsd_flush_on_stop"), |
There was a problem hiding this comment.
We can use existing setting that already did half of what you are trying to achieve (flushing the aggregator). I think it would make sense to improve it (the intent is to report everything we have), rather than introduce a new setting:
| flushOnStop: cfg.GetBool("dogstatsd_flush_on_stop"), | |
| flushOnStop: cfg.GetBool("dogstatsd_flush_incomplete_buckets"), |
| // workerCtx is cancelled immediately on Stop, which aborts any in-flight | ||
| // requests mid-transfer. Sourced from the forwarder_stop_wait_for_inflight | ||
| // config key. | ||
| waitForInflight bool |
There was a problem hiding this comment.
Can you please separate this new forwarder_stop_wait_for_inflight feature into it's own PR?
Replace the bespoke serverless metric agent lifecycle with the base metric agent. The serverless metric agent was necessary in the days when we supported aws lambda. Now that we only need to work with containerized deploys, we can switch to using the base metric agent instead.
Motivation
We want to simplify serverless-init logic and make sure that we handle shutdown correctly in environments like cloud run jobs where shutdowns happen more often.
Metrics Agent Changes
The metrics agent receives metrics packets at the listener, which are then passed to the packetsIn channel. The worker parses the packets and turns them into MetricSamples for the batcher. The batcher flushes these samples and sends them to the demultiplexer. The demultiplexer puts these samples into a channel for a separate goroutine (the time-sampler worker) to bucket up. On flush the aggregated metrics are turned into payloads, then serialized, and passed to the forwarder which packs them up into a transaction for delivery. The forwarder worker then posts those samples out to Datadog.
This PR moves serverless-init off its bespoke metrics stack (ServerlessDemultiplexer, NewServerlessServer, SyncForwarder) and onto the standard Fx forwarder/demultiplexer/DogStatsD bundle that the core agent uses. Lifecycle is now owned by Fx: the components start during fxutil.OneShot and their OnStop hooks fire in reverse construction order on shutdown (DogStatsD server → demultiplexer → forwarder). The old explicit orchestration on the metric agent — Flush, WaitForPendingSamples, and Stop — is gone; the equivalent draining now happens inside each component's stop hook, gated by config so only serverless-init opts in.
Along the way, we discovered that the standard metrics agent didn't quite meet serverless-init's needs. Serverless-init works in ephemeral environments. It doesn't need to worry about freezes like Lambda, but once the container stops, it is gone. So we need to make sure that any remaining metrics are definitely flushed when the process stops. We cannot rely on a filesystem buffer for resending metrics on restart. This happens after every single task run in Cloud Run jobs, as well as during routine app updates, refreshes, and cloud-managed restarts for the more request/response-focused workflows. Three gaps had to be closed, each behind a config gate that defaults to the existing (non-flushing) behavior so long-running agents are unaffected:
The DogStatsD server didn't flush its worker batchers on stop. The old ServerlessFlush sent a single value on a shared channel, so only one worker's batcher was flushed. We replaced that with a per-worker flush channel and a new dogstatsd_flush_on_stop setting: on stop the server sends once to every worker's channel before tearing down the worker loops, so all batched samples reach the demultiplexer.
The demultiplexer didn't drain pending samples before its final flush. This is what the old WaitForPendingSamples busy-loop was working around. The standard AgentDemultiplexer.Stop() already supports it: when dogstatsd_flush_incomplete_buckets is set, Stop first drains each time-sampler worker's sample channel (via the sentinel-nil shutdown/waitForShutdown protocol) so samples enqueued during step 1 land in the samplers, then forces a final flush to the serializer with incomplete buckets included. Both the drain and the flush are bounded together by aggregator_stop_timeout. So we no longer need a separate drain primitive — turning on the existing flag gives us a synchronous drain-then-flush, and WaitForPendingSamples was removed.
The async forwarder cancelled the worker context on stop, aborting in-flight payloads mid-transfer. We added a forwarder_stop_wait_for_inflight setting that lets those HTTP requests finish before the context is cancelled. We gated it because it introduces a behavioral change — the forwarder no longer stops immediately on Stop, it waits for in-flight transactions (bounded by forwarder_stop_timeout, after which any still-running request goroutines are left to finish on their own as the process exits). This is only appropriate for the "Stop is the last thing before the process exits" shape, which is exactly serverless-init; it may also be useful in other environments where the metrics agent is stopping and not coming back.
The net effect is that serverless-init's shutdown drains the entire pipeline end-to-end — batchers → samplers → serializer → forwarder — with each stage independently timeout-bounded so total shutdown stays inside the platform grace window (Cloud Run's 10s being the tightest).
Describe how you validated your changes
Unit testing. Running E2E tests and validating with self-monitoring and custom cloud run jobs test apps. The test apps generate 1k tasks that all stream a steady stream of metrics for a minute and then submit a large burst of metrics at the end. There are still occasionally some dropped metrics, which are confirmed to happen due to UDP packet drops at the OS level.