diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 6141667b42..a4a09592db 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3785,6 +3785,24 @@ lifecycler: # CLI flag: -ingester.active-queried-series-metrics-windows [active_queried_series_metrics_windows: | default = 2h0m0s] +# Experimental: Enable tracking of series queried from head only and expose them +# as metrics. +# CLI flag: -ingester.head-queried-series-metrics-enabled +[head_queried_series_metrics_enabled: | default = false] + +# Duration of each sub-window for head queried series tracking. +# CLI flag: -ingester.head-queried-series-metrics-window-duration +[head_queried_series_metrics_window_duration: | default = 15m] + +# Sampling rate for head queried series tracking (1.0 = 100%%). +# CLI flag: -ingester.head-queried-series-metrics-sample-rate +[head_queried_series_metrics_sample_rate: | default = 1] + +# Time windows to expose head queried series metrics. Also controls how long +# per-metric-name cardinality is reported after last query. +# CLI flag: -ingester.head-queried-series-metrics-windows +[head_queried_series_metrics_windows: | default = 2h0m0s] + # Enable uploading compacted blocks. # CLI flag: -ingester.upload-compacted-blocks-enabled [upload_compacted_blocks_enabled: | default = true] diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index e52c65a8a5..fa8600bff0 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -133,3 +133,10 @@ Currently experimental features are: - Ingester: Active Series Tracker - Per-tenant `active_series_trackers` configuration in runtime config overrides - Counts active series matching PromQL label matchers and exposes `cortex_ingester_active_series_per_tracker` metric +- Ingester: Head Queried Series Metrics + - Enable on Ingester via `-ingester.head-queried-series-metrics-enabled=true` + - Tracks unique series queried from head only (not blocks) using HLL + - Tracks per-metric-name head cardinality for recently queried metrics + - `-ingester.head-queried-series-metrics-windows` time windows to report (default: 2h) + - `-ingester.head-queried-series-metrics-window-duration` HLL sub-window size + - `-ingester.head-queried-series-metrics-sample-rate` query sampling rate diff --git a/pkg/ingester/head_queried_series_querier.go b/pkg/ingester/head_queried_series_querier.go new file mode 100644 index 0000000000..a49af9cacd --- /dev/null +++ b/pkg/ingester/head_queried_series_querier.go @@ -0,0 +1,92 @@ +package ingester + +import ( + "context" + "time" + + "github.com/oklog/ulid/v2" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" +) + +var ( + rangeHeadULID = ulid.MustParse("0000000000XXXXXXXRANGEHEAD") + headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD") +) + +// isHead returns true if the given BlockReader is a head block (in-order or OOO). +func isHead(b tsdb.BlockReader) bool { + id := b.Meta().ULID + return id == rangeHeadULID || id == headULID +} + +// headQueriedSeriesChunkQuerier wraps a ChunkQuerier for the head block and +// intercepts Select calls to collect series hashes (for HLL) and record +// queried metric names. +type headQueriedSeriesChunkQuerier struct { + storage.ChunkQuerier + headQueriedSeries *ActiveQueriedSeries + activeQueriedSeriesService *ActiveQueriedSeriesService + queriedMetricTracker *QueriedMetricTracker + userID string + sampled bool +} + +func (q *headQueriedSeriesChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { + ss := q.ChunkQuerier.Select(ctx, sortSeries, hints, matchers...) + + // Record metric name for per-metric cardinality tracking (always, regardless of sampling). + if q.queriedMetricTracker != nil { + for _, m := range matchers { + if m.Name == labels.MetricName && m.Type == labels.MatchEqual { + q.queriedMetricTracker.MarkQueried(m.Value, time.Now()) + break + } + } + } + + // Wrap series set for hash collection only if sampled. + if !q.sampled { + return ss + } + return &headQueriedSeriesSet{ + ChunkSeriesSet: ss, + headQueriedSeries: q.headQueriedSeries, + activeQueriedSeriesService: q.activeQueriedSeriesService, + userID: q.userID, + hashes: getQueriedSeriesHashesSlice(), + } +} + +// headQueriedSeriesSet wraps a ChunkSeriesSet to collect series label hashes +// during iteration and flush them to the HLL tracker when iteration completes. +type headQueriedSeriesSet struct { + storage.ChunkSeriesSet + headQueriedSeries *ActiveQueriedSeries + activeQueriedSeriesService *ActiveQueriedSeriesService + userID string + hashes []uint64 +} + +func (s *headQueriedSeriesSet) Next() bool { + if !s.ChunkSeriesSet.Next() { + s.flush() + return false + } + s.hashes = append(s.hashes, s.ChunkSeriesSet.At().Labels().Hash()) + return true +} + +func (s *headQueriedSeriesSet) At() storage.ChunkSeries { + return s.ChunkSeriesSet.At() +} + +func (s *headQueriedSeriesSet) flush() { + if len(s.hashes) > 0 && s.activeQueriedSeriesService != nil { + s.activeQueriedSeriesService.UpdateSeriesBatch(s.headQueriedSeries, s.hashes, time.Now(), s.userID) + } else if len(s.hashes) > 0 { + // If no service, return the slice to the pool. + putQueriedSeriesHashesSlice(s.hashes) + } +} diff --git a/pkg/ingester/head_queried_series_querier_test.go b/pkg/ingester/head_queried_series_querier_test.go new file mode 100644 index 0000000000..5a563f20e4 --- /dev/null +++ b/pkg/ingester/head_queried_series_querier_test.go @@ -0,0 +1,212 @@ +package ingester + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/oklog/ulid/v2" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIsHead(t *testing.T) { + tests := []struct { + name string + ulid ulid.ULID + expected bool + }{ + {"rangeHead", rangeHeadULID, true}, + {"head", headULID, true}, + {"random block", ulid.MustNew(1, nil), false}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + b := &mockBlockReader{meta: tsdb.BlockMeta{ULID: tc.ulid}} + assert.Equal(t, tc.expected, isHead(b)) + }) + } +} + +func TestHeadQueriedSeriesChunkQuerier_RecordsMetricName(t *testing.T) { + tracker := NewQueriedMetricTracker(2 * time.Hour) + hll := NewActiveQueriedSeries( + []time.Duration{2 * time.Hour}, + 15*time.Minute, + 1.0, + nil, + ) + + wrapper := &headQueriedSeriesChunkQuerier{ + ChunkQuerier: &noopChunkQuerier{}, + headQueriedSeries: hll, + activeQueriedSeriesService: nil, + queriedMetricTracker: tracker, + userID: "user-1", + sampled: false, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "http_requests_total"), + labels.MustNewMatcher(labels.MatchEqual, "job", "api"), + } + + wrapper.Select(context.Background(), false, nil, matchers...) + + now := time.Now() + metrics := tracker.GetActiveMetrics(now) + require.Len(t, metrics, 1) + assert.Equal(t, "http_requests_total", metrics[0]) +} + +func TestHeadQueriedSeriesChunkQuerier_SkipsRegexMetricName(t *testing.T) { + tracker := NewQueriedMetricTracker(2 * time.Hour) + + wrapper := &headQueriedSeriesChunkQuerier{ + ChunkQuerier: &noopChunkQuerier{}, + queriedMetricTracker: tracker, + userID: "user-1", + sampled: false, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, "http_.*"), + } + + wrapper.Select(context.Background(), false, nil, matchers...) + + metrics := tracker.GetActiveMetrics(time.Now()) + assert.Empty(t, metrics) +} + +func TestHeadQueriedSeriesSet_CollectsHashes(t *testing.T) { + lbls1 := labels.FromStrings("__name__", "metric_a", "job", "foo") + lbls2 := labels.FromStrings("__name__", "metric_a", "job", "bar") + + inner := &mockChunkSeriesSet{ + series: []storage.ChunkSeries{ + &mockChunkSeries{lbls: lbls1}, + &mockChunkSeries{lbls: lbls2}, + }, + } + + hll := NewActiveQueriedSeries( + []time.Duration{2 * time.Hour}, + 15*time.Minute, + 1.0, + nil, + ) + + // Use a real service to process the hashes. + svc := NewActiveQueriedSeriesService(log.NewNopLogger(), nil) + require.NoError(t, svc.StartAsync(context.Background())) + defer svc.StopAsync() + require.NoError(t, svc.AwaitRunning(context.Background())) + + tracker := NewQueriedMetricTracker(2 * time.Hour) + wrapper := &headQueriedSeriesChunkQuerier{ + ChunkQuerier: &mockChunkQuerierWithSeries{inner: inner}, + headQueriedSeries: hll, + activeQueriedSeriesService: svc, + queriedMetricTracker: tracker, + userID: "user-1", + sampled: true, + } + + matchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "metric_a"), + } + + ss := wrapper.Select(context.Background(), false, nil, matchers...) + count := 0 + for ss.Next() { + count++ + } + assert.Equal(t, 2, count) + + // Give the async worker time to process. + time.Sleep(50 * time.Millisecond) + + estimate, err := hll.GetSeriesQueried(time.Now(), 2*time.Hour) + require.NoError(t, err) + assert.Equal(t, uint64(2), estimate) + + // Also verify metric name was recorded. + metrics := tracker.GetActiveMetrics(time.Now()) + require.Len(t, metrics, 1) + assert.Equal(t, "metric_a", metrics[0]) +} + +// Mock implementations + +type mockBlockReader struct { + meta tsdb.BlockMeta +} + +func (m *mockBlockReader) Meta() tsdb.BlockMeta { return m.meta } +func (m *mockBlockReader) Index() (tsdb.IndexReader, error) { return nil, nil } +func (m *mockBlockReader) Chunks() (tsdb.ChunkReader, error) { return nil, nil } +func (m *mockBlockReader) Tombstones() (tombstones.Reader, error) { return nil, nil } +func (m *mockBlockReader) Size() int64 { return 0 } +func (m *mockBlockReader) String() string { return "" } +func (m *mockBlockReader) MinTime() int64 { return 0 } +func (m *mockBlockReader) MaxTime() int64 { return 0 } + +type noopChunkQuerier struct{} + +func (q *noopChunkQuerier) Select(_ context.Context, _ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.ChunkSeriesSet { + return storage.EmptyChunkSeriesSet() +} +func (q *noopChunkQuerier) LabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} +func (q *noopChunkQuerier) LabelNames(_ context.Context, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} +func (q *noopChunkQuerier) Close() error { return nil } + +type mockChunkQuerierWithSeries struct { + inner *mockChunkSeriesSet +} + +func (q *mockChunkQuerierWithSeries) Select(_ context.Context, _ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.ChunkSeriesSet { + return q.inner +} +func (q *mockChunkQuerierWithSeries) LabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} +func (q *mockChunkQuerierWithSeries) LabelNames(_ context.Context, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) { + return nil, nil, nil +} +func (q *mockChunkQuerierWithSeries) Close() error { return nil } + +type mockChunkSeriesSet struct { + series []storage.ChunkSeries + idx int +} + +func (m *mockChunkSeriesSet) Next() bool { + if m.idx >= len(m.series) { + return false + } + m.idx++ + return true +} +func (m *mockChunkSeriesSet) At() storage.ChunkSeries { return m.series[m.idx-1] } +func (m *mockChunkSeriesSet) Err() error { return nil } +func (m *mockChunkSeriesSet) Warnings() annotations.Annotations { return nil } + +type mockChunkSeries struct { + lbls labels.Labels +} + +func (m *mockChunkSeries) Labels() labels.Labels { return m.lbls } +func (m *mockChunkSeries) Iterator(_ chunks.Iterator) chunks.Iterator { return nil } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 6116a31899..8938589ea9 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -136,6 +136,11 @@ type Config struct { ActiveQueriedSeriesMetricsSampleRate float64 `yaml:"active_queried_series_metrics_sample_rate"` ActiveQueriedSeriesMetricsWindows cortex_tsdb.DurationList `yaml:"active_queried_series_metrics_windows"` + HeadQueriedSeriesMetricsEnabled bool `yaml:"head_queried_series_metrics_enabled"` + HeadQueriedSeriesMetricsWindowDuration time.Duration `yaml:"head_queried_series_metrics_window_duration"` + HeadQueriedSeriesMetricsSampleRate float64 `yaml:"head_queried_series_metrics_sample_rate"` + HeadQueriedSeriesMetricsWindows cortex_tsdb.DurationList `yaml:"head_queried_series_metrics_windows"` + // Use blocks storage. BlocksStorageConfig cortex_tsdb.BlocksStorageConfig `yaml:"-"` @@ -202,6 +207,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.ActiveQueriedSeriesMetricsWindows = cortex_tsdb.DurationList{2 * time.Hour} f.Var(&cfg.ActiveQueriedSeriesMetricsWindows, "ingester.active-queried-series-metrics-windows", "Time windows to expose queried series metric. Each window tracks queried series within that time period.") + f.BoolVar(&cfg.HeadQueriedSeriesMetricsEnabled, "ingester.head-queried-series-metrics-enabled", false, "Experimental: Enable tracking of series queried from head only and expose them as metrics.") + f.DurationVar(&cfg.HeadQueriedSeriesMetricsWindowDuration, "ingester.head-queried-series-metrics-window-duration", 15*time.Minute, "Duration of each sub-window for head queried series tracking.") + f.Float64Var(&cfg.HeadQueriedSeriesMetricsSampleRate, "ingester.head-queried-series-metrics-sample-rate", 1.0, "Sampling rate for head queried series tracking (1.0 = 100%%).") + cfg.HeadQueriedSeriesMetricsWindows = cortex_tsdb.DurationList{2 * time.Hour} + f.Var(&cfg.HeadQueriedSeriesMetricsWindows, "ingester.head-queried-series-metrics-windows", "Time windows to expose head queried series metrics. Also controls how long per-metric-name cardinality is reported after last query.") + f.BoolVar(&cfg.UploadCompactedBlocksEnabled, "ingester.upload-compacted-blocks-enabled", true, "Enable uploading compacted blocks.") f.StringVar(&cfg.IgnoreSeriesLimitForMetricNames, "ingester.ignore-series-limit-for-metric-names", "", "Comma-separated list of metric names, for which -ingester.max-series-per-metric and -ingester.max-global-series-per-metric limits will be ignored. Does not affect max-series-per-user or max-global-series-per-metric limits.") f.StringVar(&cfg.AdminLimitMessage, "ingester.admin-limit-message", "please contact administrator to raise it", "Customize the message contained in limit errors") @@ -363,13 +374,15 @@ func (r tsdbCloseCheckResult) shouldClose() bool { } type userTSDB struct { - db *tsdb.DB - userID string - activeSeries *ActiveSeries - activeQueriedSeries *ActiveQueriedSeries - seriesInMetric *metricCounter - labelSetCounter *labelSetCounter - limiter *Limiter + db *tsdb.DB + userID string + activeSeries *ActiveSeries + activeQueriedSeries *ActiveQueriedSeries + headQueriedSeries *ActiveQueriedSeries + queriedMetricTracker *QueriedMetricTracker + seriesInMetric *metricCounter + labelSetCounter *labelSetCounter + limiter *Limiter instanceSeriesCount *atomic.Int64 // Shared across all userTSDB instances created by ingester. instanceLimitsFn func() *InstanceLimits @@ -820,6 +833,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe false, cfg.ActiveSeriesMetricsEnabled, cfg.ActiveQueriedSeriesMetricsEnabled, + cfg.HeadQueriedSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, &i.maxInflightPushRequests, @@ -918,6 +932,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe false, false, false, + false, i.getInstanceLimits, nil, &i.maxInflightPushRequests, @@ -1058,6 +1073,13 @@ func (i *Ingester) updateLoop(ctx context.Context) error { defer t.Stop() } + var headQueriedSeriesTickerChan <-chan time.Time + if i.cfg.HeadQueriedSeriesMetricsEnabled { + t := time.NewTicker(i.cfg.ActiveQueriedSeriesMetricsUpdatePeriod) + headQueriedSeriesTickerChan = t.C + defer t.Stop() + } + // Similarly to the above, this is a hardcoded value. metadataPurgeTicker := time.NewTicker(metadataPurgePeriod) defer metadataPurgeTicker.Stop() @@ -1086,6 +1108,8 @@ func (i *Ingester) updateLoop(ctx context.Context) error { i.updateActiveSeries(ctx) case <-activeQueriedSeriesTickerChan: i.updateActiveQueriedSeries(ctx) + case <-headQueriedSeriesTickerChan: + i.updateHeadQueriedMetrics(ctx) case <-maxTrackerResetTicker.C: i.maxInflightQueryRequests.Tick() i.maxInflightPushRequests.Tick() @@ -1188,6 +1212,38 @@ func (i *Ingester) updateActiveQueriedSeries(ctx context.Context) { } } +func (i *Ingester) updateHeadQueriedMetrics(ctx context.Context) { + now := time.Now() + for _, userID := range i.getTSDBUsers() { + userDB, err := i.getTSDB(userID) + if err != nil || userDB == nil { + continue + } + + // Metric 1: total series queried from head (HLL) + if userDB.headQueriedSeries != nil { + userDB.headQueriedSeries.Purge(now) + for _, windowDuration := range i.cfg.HeadQueriedSeriesMetricsWindows { + estimatedCount, err := userDB.headQueriedSeries.GetSeriesQueried(now, windowDuration) + if err != nil { + level.Error(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to get head queried series count", "user", userID, "window", windowDuration, "err", err) + continue + } + i.metrics.headQueriedSeriesPerUser.WithLabelValues(userID, windowDuration.String()).Set(float64(estimatedCount)) + } + } + + // Metric 2: per metric name cardinality + if userDB.queriedMetricTracker != nil { + activeMetrics := userDB.queriedMetricTracker.GetActiveMetrics(now) + for _, metricName := range activeMetrics { + count := userDB.seriesInMetric.getSeriesCountForMetric(metricName) + i.metrics.headQueriedMetricSeries.WithLabelValues(userID, metricName).Set(float64(count)) + } + } + } +} + func (i *Ingester) updateLabelSetMetrics() { activeUserSet := make(map[string]map[uint64]struct{}) for _, userID := range i.getTSDBUsers() { @@ -2870,11 +2926,29 @@ func (i *Ingester) blockChunkQuerierFunc(userId string) tsdb.BlockChunkQuerierFu // This occurs because the tsdb.PostingsForMatchers function can return invalid data in such scenarios. // For more details, see: https://github.com/cortexproject/cortex/issues/6556 // TODO: alanprot: Consider removing this logic when prometheus is updated as this logic is "fixed" upstream. + var q storage.ChunkQuerier if postingCache == nil || mint > db.Head().MaxTime() { - return tsdb.NewBlockChunkQuerier(b, mint, maxt) + q, err = tsdb.NewBlockChunkQuerier(b, mint, maxt) + } else { + q, err = cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt) + } + if err != nil { + return nil, err } - return cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt) + // Wrap only for head queriers when head queried series metrics are enabled. + if i.cfg.HeadQueriedSeriesMetricsEnabled && isHead(b) && db != nil && db.headQueriedSeries != nil { + q = &headQueriedSeriesChunkQuerier{ + ChunkQuerier: q, + headQueriedSeries: db.headQueriedSeries, + activeQueriedSeriesService: i.activeQueriedSeriesService, + queriedMetricTracker: db.queriedMetricTracker, + userID: userId, + sampled: db.headQueriedSeries.SampleRequest(), + } + } + + return q, nil } } @@ -2901,15 +2975,36 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { ) } + var headQueriedSeries *ActiveQueriedSeries + var queriedMetricTracker *QueriedMetricTracker + if i.cfg.HeadQueriedSeriesMetricsEnabled { + headQueriedSeries = NewActiveQueriedSeries( + i.cfg.HeadQueriedSeriesMetricsWindows, + i.cfg.HeadQueriedSeriesMetricsWindowDuration, + i.cfg.HeadQueriedSeriesMetricsSampleRate, + i.logger, + ) + // Use the largest configured window as the expiry for per-metric-name tracking. + maxWindow := i.cfg.HeadQueriedSeriesMetricsWindows[0] + for _, w := range i.cfg.HeadQueriedSeriesMetricsWindows { + if w > maxWindow { + maxWindow = w + } + } + queriedMetricTracker = NewQueriedMetricTracker(maxWindow) + } + userDB := &userTSDB{ - userID: userID, - activeSeries: NewActiveSeries(), - activeQueriedSeries: activeQueriedSeries, - seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), - labelSetCounter: newLabelSetCounter(i.limiter), - trackerCounter: newTrackerCounter(), - ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), - ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), + userID: userID, + activeSeries: NewActiveSeries(), + activeQueriedSeries: activeQueriedSeries, + headQueriedSeries: headQueriedSeries, + queriedMetricTracker: queriedMetricTracker, + seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), + labelSetCounter: newLabelSetCounter(i.limiter), + trackerCounter: newTrackerCounter(), + ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), + ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), instanceLimitsFn: i.getInstanceLimits, instanceSeriesCount: &i.TSDBState.seriesCount, diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 238c578e65..cdb58187af 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -58,6 +58,8 @@ type ingesterMetrics struct { activeSeriesPerUser *prometheus.GaugeVec activeNHSeriesPerUser *prometheus.GaugeVec activeQueriedSeriesPerUser *prometheus.GaugeVec + headQueriedSeriesPerUser *prometheus.GaugeVec + headQueriedMetricSeries *prometheus.GaugeVec limitsPerLabelSet *prometheus.GaugeVec usagePerLabelSet *prometheus.GaugeVec activeSeriesPerTracker *prometheus.GaugeVec @@ -87,6 +89,7 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSDB bool, activeSeriesEnabled bool, activeQueriedSeriesEnabled bool, + headQueriedSeriesEnabled bool, instanceLimitsFn func() *InstanceLimits, ingestionRate *util_math.EwmaRate, inflightPushRequests *util_math.MaxTracker, @@ -309,6 +312,16 @@ func newIngesterMetrics(r prometheus.Registerer, Name: "cortex_ingester_active_queried_series", Help: "Estimated number of currently active queried series per user (probabilistic count using HyperLogLog).", }, []string{"user", "window"}), + + // Not registered automatically, but only if headQueriedSeriesEnabled is true. + headQueriedSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_queried_series_from_head", + Help: "Estimated number of unique series queried from head within the configured time window.", + }, []string{"user", "window"}), + headQueriedMetricSeries: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_queried_metric_series_in_head", + Help: "Current head cardinality for metric names queried within the configured time window.", + }, []string{"user", "metric_name"}), } if regexMatcherLimitsEnabled { @@ -356,6 +369,11 @@ func newIngesterMetrics(r prometheus.Registerer, r.MustRegister(m.activeQueriedSeriesPerUser) } + if headQueriedSeriesEnabled && r != nil { + r.MustRegister(m.headQueriedSeriesPerUser) + r.MustRegister(m.headQueriedMetricSeries) + } + if createMetricsConflictingWithTSDB { m.memSeriesCreatedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: memSeriesCreatedTotalName, @@ -382,6 +400,8 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) { m.activeNHSeriesPerUser.DeleteLabelValues(userID) m.activeSeriesPerTracker.DeletePartialMatch(prometheus.Labels{"user": userID}) m.activeQueriedSeriesPerUser.DeletePartialMatch(prometheus.Labels{"user": userID}) + m.headQueriedSeriesPerUser.DeletePartialMatch(prometheus.Labels{"user": userID}) + m.headQueriedMetricSeries.DeletePartialMatch(prometheus.Labels{"user": userID}) m.usagePerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID}) m.pushErrorsTotal.DeletePartialMatch(prometheus.Labels{"user": userID}) diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index 30d0c4cd5c..378ada9cdf 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -20,7 +20,7 @@ func TestRegexMatcherLimitsMetricsFeatureFlag(t *testing.T) { // Test with feature flag disabled - metrics should be nil t.Run("metrics are nil when feature flag is disabled", func(t *testing.T) { reg := prometheus.NewRegistry() - m := newIngesterMetrics(reg, false, false, false, + m := newIngesterMetrics(reg, false, false, false, false, func() *InstanceLimits { return &InstanceLimits{} }, ingestionRate, &inflightPushRequests, &maxInflightQueryRequests, false, false) @@ -33,7 +33,7 @@ func TestRegexMatcherLimitsMetricsFeatureFlag(t *testing.T) { // Test with feature flag enabled - metrics should be initialized t.Run("metrics are initialized when feature flag is enabled", func(t *testing.T) { reg := prometheus.NewRegistry() - m := newIngesterMetrics(reg, false, false, false, + m := newIngesterMetrics(reg, false, false, false, false, func() *InstanceLimits { return &InstanceLimits{} }, ingestionRate, &inflightPushRequests, &maxInflightQueryRequests, false, true) @@ -51,7 +51,7 @@ func TestUnoptimizedRegexRejectedMetric(t *testing.T) { t.Run("rejected metric increments correctly", func(t *testing.T) { reg := prometheus.NewRegistry() - m := newIngesterMetrics(reg, false, false, false, + m := newIngesterMetrics(reg, false, false, false, false, func() *InstanceLimits { return &InstanceLimits{} }, ingestionRate, &inflightPushRequests, &maxInflightQueryRequests, false, true) @@ -75,7 +75,7 @@ func TestUnoptimizedRegexRejectedMetric(t *testing.T) { t.Run("metric cleanup works correctly", func(t *testing.T) { reg := prometheus.NewRegistry() - m := newIngesterMetrics(reg, false, false, false, + m := newIngesterMetrics(reg, false, false, false, false, func() *InstanceLimits { return &InstanceLimits{} }, ingestionRate, &inflightPushRequests, &maxInflightQueryRequests, false, true) @@ -110,6 +110,7 @@ func TestIngesterMetrics(t *testing.T) { false, true, false, + false, func() *InstanceLimits { return &InstanceLimits{ MaxIngestionRate: 12, diff --git a/pkg/ingester/queried_metric_tracker.go b/pkg/ingester/queried_metric_tracker.go new file mode 100644 index 0000000000..529f40f723 --- /dev/null +++ b/pkg/ingester/queried_metric_tracker.go @@ -0,0 +1,46 @@ +package ingester + +import ( + "sync" + "time" +) + +// QueriedMetricTracker tracks which metric names have been queried from the head +// within a configurable time window. On periodic collection, active entries are +// used to look up current head cardinality via seriesInMetric. +type QueriedMetricTracker struct { + mu sync.Mutex + entries map[string]time.Time // metric_name -> last queried time + window time.Duration +} + +// NewQueriedMetricTracker creates a new tracker with the given expiry window. +func NewQueriedMetricTracker(window time.Duration) *QueriedMetricTracker { + return &QueriedMetricTracker{ + entries: make(map[string]time.Time), + window: window, + } +} + +// MarkQueried records that a metric name was queried. Called on the query path. +// Repeated calls for the same metric name just update the timestamp. +func (t *QueriedMetricTracker) MarkQueried(metricName string, now time.Time) { + t.mu.Lock() + t.entries[metricName] = now + t.mu.Unlock() +} + +// GetActiveMetrics returns metric names queried within the window and purges expired entries. +func (t *QueriedMetricTracker) GetActiveMetrics(now time.Time) []string { + t.mu.Lock() + defer t.mu.Unlock() + var result []string + for name, lastQueried := range t.entries { + if now.Sub(lastQueried) <= t.window { + result = append(result, name) + } else { + delete(t.entries, name) + } + } + return result +} diff --git a/pkg/ingester/queried_metric_tracker_test.go b/pkg/ingester/queried_metric_tracker_test.go new file mode 100644 index 0000000000..6dbfd841bc --- /dev/null +++ b/pkg/ingester/queried_metric_tracker_test.go @@ -0,0 +1,79 @@ +package ingester + +import ( + "sort" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestQueriedMetricTracker_MarkQueried(t *testing.T) { + tracker := NewQueriedMetricTracker(2 * time.Hour) + now := time.Now() + + tracker.MarkQueried("http_requests_total", now) + tracker.MarkQueried("node_cpu_seconds_total", now) + + metrics := tracker.GetActiveMetrics(now) + sort.Strings(metrics) + assert.Equal(t, []string{"http_requests_total", "node_cpu_seconds_total"}, metrics) +} + +func TestQueriedMetricTracker_OverwritesTimestamp(t *testing.T) { + tracker := NewQueriedMetricTracker(2 * time.Hour) + now := time.Now() + + tracker.MarkQueried("http_requests_total", now.Add(-90*time.Minute)) + tracker.MarkQueried("http_requests_total", now) + + // Should still be active since last query was at now + metrics := tracker.GetActiveMetrics(now.Add(90 * time.Minute)) + assert.Equal(t, []string{"http_requests_total"}, metrics) +} + +func TestQueriedMetricTracker_ExpiresEntries(t *testing.T) { + tracker := NewQueriedMetricTracker(2 * time.Hour) + now := time.Now() + + tracker.MarkQueried("http_requests_total", now) + tracker.MarkQueried("old_metric", now.Add(-3*time.Hour)) + + metrics := tracker.GetActiveMetrics(now) + assert.Equal(t, []string{"http_requests_total"}, metrics) + + // Verify expired entry was purged + metrics = tracker.GetActiveMetrics(now) + assert.Equal(t, []string{"http_requests_total"}, metrics) +} + +func TestQueriedMetricTracker_EmptyAfterExpiry(t *testing.T) { + tracker := NewQueriedMetricTracker(1 * time.Minute) + now := time.Now() + + tracker.MarkQueried("http_requests_total", now) + + metrics := tracker.GetActiveMetrics(now.Add(2 * time.Minute)) + assert.Empty(t, metrics) +} + +func TestQueriedMetricTracker_ConcurrentAccess(t *testing.T) { + tracker := NewQueriedMetricTracker(2 * time.Hour) + now := time.Now() + + var wg sync.WaitGroup + for i := range 100 { + wg.Add(1) + go func(i int) { + defer wg.Done() + tracker.MarkQueried("metric", now) + }(i) + } + wg.Wait() + + metrics := tracker.GetActiveMetrics(now) + require.Len(t, metrics, 1) + assert.Equal(t, "metric", metrics[0]) +} diff --git a/pkg/ingester/user_metrics_metadata_test.go b/pkg/ingester/user_metrics_metadata_test.go index fa70e090c3..0946e4869c 100644 --- a/pkg/ingester/user_metrics_metadata_test.go +++ b/pkg/ingester/user_metrics_metadata_test.go @@ -26,6 +26,7 @@ func Test_UserMetricsMetadata(t *testing.T) { false, false, false, + false, func() *InstanceLimits { return &InstanceLimits{} }, diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 2918c8993a..3a25781302 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -86,6 +86,13 @@ func (m *metricCounter) increaseSeriesForMetric(metric string) { shard.mtx.Unlock() } +func (m *metricCounter) getSeriesCountForMetric(metric string) int { + shard := m.getShard(metric) + shard.mtx.Lock() + defer shard.mtx.Unlock() + return shard.m[metric] +} + type labelSetCounterEntry struct { count int labels labels.Labels diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 915c689d15..385a48f2e0 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -4620,6 +4620,34 @@ "type": "boolean", "x-cli-flag": "ingester.enable-regex-matcher-limits" }, + "head_queried_series_metrics_enabled": { + "default": false, + "description": "Experimental: Enable tracking of series queried from head only and expose them as metrics.", + "type": "boolean", + "x-cli-flag": "ingester.head-queried-series-metrics-enabled" + }, + "head_queried_series_metrics_sample_rate": { + "default": 1, + "description": "Sampling rate for head queried series tracking (1.0 = 100%%).", + "type": "number", + "x-cli-flag": "ingester.head-queried-series-metrics-sample-rate" + }, + "head_queried_series_metrics_window_duration": { + "default": "15m0s", + "description": "Duration of each sub-window for head queried series tracking.", + "type": "string", + "x-cli-flag": "ingester.head-queried-series-metrics-window-duration", + "x-format": "duration" + }, + "head_queried_series_metrics_windows": { + "default": "2h0m0s", + "description": "Time windows to expose head queried series metrics. Also controls how long per-metric-name cardinality is reported after last query.", + "items": { + "type": "string" + }, + "type": "array", + "x-cli-flag": "ingester.head-queried-series-metrics-windows" + }, "ignore_series_limit_for_metric_names": { "description": "Comma-separated list of metric names, for which -ingester.max-series-per-metric and -ingester.max-global-series-per-metric limits will be ignored. Does not affect max-series-per-user or max-global-series-per-metric limits.", "type": "string",