Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3785,6 +3785,24 @@ lifecycler:
# CLI flag: -ingester.active-queried-series-metrics-windows
[active_queried_series_metrics_windows: <list of duration> | default = 2h0m0s]

# Experimental: Enable tracking of series queried from head only and expose them
# as metrics.
# CLI flag: -ingester.head-queried-series-metrics-enabled
[head_queried_series_metrics_enabled: <boolean> | default = false]

# Duration of each sub-window for head queried series tracking.
# CLI flag: -ingester.head-queried-series-metrics-window-duration
[head_queried_series_metrics_window_duration: <duration> | default = 15m]

# Sampling rate for head queried series tracking (1.0 = 100%%).
# CLI flag: -ingester.head-queried-series-metrics-sample-rate
[head_queried_series_metrics_sample_rate: <float> | default = 1]

# Time windows to expose head queried series metrics. Also controls how long
# per-metric-name cardinality is reported after last query.
# CLI flag: -ingester.head-queried-series-metrics-windows
[head_queried_series_metrics_windows: <list of duration> | default = 2h0m0s]

# Enable uploading compacted blocks.
# CLI flag: -ingester.upload-compacted-blocks-enabled
[upload_compacted_blocks_enabled: <boolean> | default = true]
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,10 @@ Currently experimental features are:
- Ingester: Active Series Tracker
- Per-tenant `active_series_trackers` configuration in runtime config overrides
- Counts active series matching PromQL label matchers and exposes `cortex_ingester_active_series_per_tracker` metric
- Ingester: Head Queried Series Metrics
- Enable on Ingester via `-ingester.head-queried-series-metrics-enabled=true`
- Tracks unique series queried from head only (not blocks) using HLL
- Tracks per-metric-name head cardinality for recently queried metrics
- `-ingester.head-queried-series-metrics-windows` time windows to report (default: 2h)
- `-ingester.head-queried-series-metrics-window-duration` HLL sub-window size
- `-ingester.head-queried-series-metrics-sample-rate` query sampling rate
92 changes: 92 additions & 0 deletions pkg/ingester/head_queried_series_querier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package ingester

import (
"context"
"time"

"github.com/oklog/ulid/v2"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
)

var (
rangeHeadULID = ulid.MustParse("0000000000XXXXXXXRANGEHEAD")
headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD")
)

// isHead returns true if the given BlockReader is a head block (in-order or OOO).
func isHead(b tsdb.BlockReader) bool {
id := b.Meta().ULID
return id == rangeHeadULID || id == headULID
}

// headQueriedSeriesChunkQuerier wraps a ChunkQuerier for the head block and
// intercepts Select calls to collect series hashes (for HLL) and record
// queried metric names.
type headQueriedSeriesChunkQuerier struct {
storage.ChunkQuerier
headQueriedSeries *ActiveQueriedSeries
activeQueriedSeriesService *ActiveQueriedSeriesService
queriedMetricTracker *QueriedMetricTracker
userID string
sampled bool
}

func (q *headQueriedSeriesChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
ss := q.ChunkQuerier.Select(ctx, sortSeries, hints, matchers...)

// Record metric name for per-metric cardinality tracking (always, regardless of sampling).
if q.queriedMetricTracker != nil {
for _, m := range matchers {
if m.Name == labels.MetricName && m.Type == labels.MatchEqual {
q.queriedMetricTracker.MarkQueried(m.Value, time.Now())
break
}
}
}

// Wrap series set for hash collection only if sampled.
if !q.sampled {
return ss
}
return &headQueriedSeriesSet{
ChunkSeriesSet: ss,
headQueriedSeries: q.headQueriedSeries,
activeQueriedSeriesService: q.activeQueriedSeriesService,
userID: q.userID,
hashes: getQueriedSeriesHashesSlice(),
}
}

// headQueriedSeriesSet wraps a ChunkSeriesSet to collect series label hashes
// during iteration and flush them to the HLL tracker when iteration completes.
type headQueriedSeriesSet struct {
storage.ChunkSeriesSet
headQueriedSeries *ActiveQueriedSeries
activeQueriedSeriesService *ActiveQueriedSeriesService
userID string
hashes []uint64
}

func (s *headQueriedSeriesSet) Next() bool {
if !s.ChunkSeriesSet.Next() {
s.flush()
return false
}
s.hashes = append(s.hashes, s.ChunkSeriesSet.At().Labels().Hash())
return true
}

func (s *headQueriedSeriesSet) At() storage.ChunkSeries {
return s.ChunkSeriesSet.At()
}

func (s *headQueriedSeriesSet) flush() {
if len(s.hashes) > 0 && s.activeQueriedSeriesService != nil {
s.activeQueriedSeriesService.UpdateSeriesBatch(s.headQueriedSeries, s.hashes, time.Now(), s.userID)
} else if len(s.hashes) > 0 {
// If no service, return the slice to the pool.
putQueriedSeriesHashesSlice(s.hashes)
}
}
212 changes: 212 additions & 0 deletions pkg/ingester/head_queried_series_querier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package ingester

import (
"context"
"testing"
"time"

"github.com/go-kit/log"
"github.com/oklog/ulid/v2"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/util/annotations"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestIsHead(t *testing.T) {
tests := []struct {
name string
ulid ulid.ULID
expected bool
}{
{"rangeHead", rangeHeadULID, true},
{"head", headULID, true},
{"random block", ulid.MustNew(1, nil), false},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
b := &mockBlockReader{meta: tsdb.BlockMeta{ULID: tc.ulid}}
assert.Equal(t, tc.expected, isHead(b))
})
}
}

func TestHeadQueriedSeriesChunkQuerier_RecordsMetricName(t *testing.T) {
tracker := NewQueriedMetricTracker(2 * time.Hour)
hll := NewActiveQueriedSeries(
[]time.Duration{2 * time.Hour},
15*time.Minute,
1.0,
nil,
)

wrapper := &headQueriedSeriesChunkQuerier{
ChunkQuerier: &noopChunkQuerier{},
headQueriedSeries: hll,
activeQueriedSeriesService: nil,
queriedMetricTracker: tracker,
userID: "user-1",
sampled: false,
}

matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "http_requests_total"),
labels.MustNewMatcher(labels.MatchEqual, "job", "api"),
}

wrapper.Select(context.Background(), false, nil, matchers...)

now := time.Now()
metrics := tracker.GetActiveMetrics(now)
require.Len(t, metrics, 1)
assert.Equal(t, "http_requests_total", metrics[0])
}

func TestHeadQueriedSeriesChunkQuerier_SkipsRegexMetricName(t *testing.T) {
tracker := NewQueriedMetricTracker(2 * time.Hour)

wrapper := &headQueriedSeriesChunkQuerier{
ChunkQuerier: &noopChunkQuerier{},
queriedMetricTracker: tracker,
userID: "user-1",
sampled: false,
}

matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, "http_.*"),
}

wrapper.Select(context.Background(), false, nil, matchers...)

metrics := tracker.GetActiveMetrics(time.Now())
assert.Empty(t, metrics)
}

func TestHeadQueriedSeriesSet_CollectsHashes(t *testing.T) {
lbls1 := labels.FromStrings("__name__", "metric_a", "job", "foo")
lbls2 := labels.FromStrings("__name__", "metric_a", "job", "bar")

inner := &mockChunkSeriesSet{
series: []storage.ChunkSeries{
&mockChunkSeries{lbls: lbls1},
&mockChunkSeries{lbls: lbls2},
},
}

hll := NewActiveQueriedSeries(
[]time.Duration{2 * time.Hour},
15*time.Minute,
1.0,
nil,
)

// Use a real service to process the hashes.
svc := NewActiveQueriedSeriesService(log.NewNopLogger(), nil)
require.NoError(t, svc.StartAsync(context.Background()))
defer svc.StopAsync()
require.NoError(t, svc.AwaitRunning(context.Background()))

tracker := NewQueriedMetricTracker(2 * time.Hour)
wrapper := &headQueriedSeriesChunkQuerier{
ChunkQuerier: &mockChunkQuerierWithSeries{inner: inner},
headQueriedSeries: hll,
activeQueriedSeriesService: svc,
queriedMetricTracker: tracker,
userID: "user-1",
sampled: true,
}

matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "metric_a"),
}

ss := wrapper.Select(context.Background(), false, nil, matchers...)
count := 0
for ss.Next() {
count++
}
assert.Equal(t, 2, count)

// Give the async worker time to process.
time.Sleep(50 * time.Millisecond)

estimate, err := hll.GetSeriesQueried(time.Now(), 2*time.Hour)
require.NoError(t, err)
assert.Equal(t, uint64(2), estimate)

// Also verify metric name was recorded.
metrics := tracker.GetActiveMetrics(time.Now())
require.Len(t, metrics, 1)
assert.Equal(t, "metric_a", metrics[0])
}

// Mock implementations

type mockBlockReader struct {
meta tsdb.BlockMeta
}

func (m *mockBlockReader) Meta() tsdb.BlockMeta { return m.meta }
func (m *mockBlockReader) Index() (tsdb.IndexReader, error) { return nil, nil }
func (m *mockBlockReader) Chunks() (tsdb.ChunkReader, error) { return nil, nil }
func (m *mockBlockReader) Tombstones() (tombstones.Reader, error) { return nil, nil }
func (m *mockBlockReader) Size() int64 { return 0 }
func (m *mockBlockReader) String() string { return "" }
func (m *mockBlockReader) MinTime() int64 { return 0 }
func (m *mockBlockReader) MaxTime() int64 { return 0 }

type noopChunkQuerier struct{}

func (q *noopChunkQuerier) Select(_ context.Context, _ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.ChunkSeriesSet {
return storage.EmptyChunkSeriesSet()
}
func (q *noopChunkQuerier) LabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
func (q *noopChunkQuerier) LabelNames(_ context.Context, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
func (q *noopChunkQuerier) Close() error { return nil }

type mockChunkQuerierWithSeries struct {
inner *mockChunkSeriesSet
}

func (q *mockChunkQuerierWithSeries) Select(_ context.Context, _ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.ChunkSeriesSet {
return q.inner
}
func (q *mockChunkQuerierWithSeries) LabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
func (q *mockChunkQuerierWithSeries) LabelNames(_ context.Context, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, nil
}
func (q *mockChunkQuerierWithSeries) Close() error { return nil }

type mockChunkSeriesSet struct {
series []storage.ChunkSeries
idx int
}

func (m *mockChunkSeriesSet) Next() bool {
if m.idx >= len(m.series) {
return false
}
m.idx++
return true
}
func (m *mockChunkSeriesSet) At() storage.ChunkSeries { return m.series[m.idx-1] }
func (m *mockChunkSeriesSet) Err() error { return nil }
func (m *mockChunkSeriesSet) Warnings() annotations.Annotations { return nil }

type mockChunkSeries struct {
lbls labels.Labels
}

func (m *mockChunkSeries) Labels() labels.Labels { return m.lbls }
func (m *mockChunkSeries) Iterator(_ chunks.Iterator) chunks.Iterator { return nil }
Loading
Loading