diff --git a/CHANGELOG.md b/CHANGELOG.md index f47c4ed89e..0203202efd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160 * [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446 * [CHANGE] HA Tracker: Move `-distributor.ha-tracker.failover-timeout` from a global config to a per-tenant runtime config. The flag name and default value (30s) remain the same. #7481 +* [CHANGE] Ingester: Add experimental `-ingester.local-limit-cache-enabled` flag to prevent false per-user series limit throttling during ingester scale-up. When enabled, the local limit (derived from `global_limit / num_ingesters * replication_factor`) is cached per tenant and prevented from shrinking when the global limit has not changed. This may cause temporary over-commitment of the global series limit until head compaction (~2h). #7491 * [FEATURE] Ingester: Add experimental active series tracker that counts active series by configurable label matchers (including regex) per tenant and exposes `cortex_ingester_active_series_per_tracker` metric. Configured via `active_series_trackers` in runtime config overrides. #7476 * [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302 * [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 6141667b42..16b4b384a4 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3861,6 +3861,13 @@ instance_limits: # CLI flag: -ingester.enable-regex-matcher-limits [enable_regex_matcher_limits: | default = false] +# [Experimental] When enabled, the per-ingester local series limit is cached and +# prevented from shrinking during ring topology changes if the global limit has +# not changed. This prevents false throttling during ingester scale-up at the +# cost of potential temporary over-commitment until head compaction. +# CLI flag: -ingester.local-limit-cache-enabled +[local_limit_cache_enabled: | default = false] + query_protection: rejection: threshold: diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index e52c65a8a5..1b95d045ec 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -130,6 +130,9 @@ Currently experimental features are: - `-validation.max-label-cardinality-for-unoptimized-regex` (int) - maximum label cardinality - `-validation.max-total-label-value-length-for-unoptimized-regex` (int) - maximum total length of all label values in bytes - HATracker: `-distributor.ha-tracker.enable-startup-sync` (bool) - If enabled, fetches all tracked keys on startup to populate the local cache. +- Ingester: Local Limit Cache + - Enable local limit caching during ring topology changes via `-ingester.local-limit-cache-enabled=true` + - When enabled, the per-ingester local series limit is prevented from shrinking during scale-up if the global limit has not changed. This avoids false throttling but may cause temporary over-commitment until head compaction (~2h). - Ingester: Active Series Tracker - Per-tenant `active_series_trackers` configuration in runtime config overrides - Counts active series matching PromQL label matchers and exposes `cortex_ingester_active_series_per_tracker` metric diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 6116a31899..d59c7a9115 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -180,6 +180,12 @@ type Config struct { // for unoptimized regex matchers, and enforce per-tenant limits if configured. EnableRegexMatcherLimits bool `yaml:"enable_regex_matcher_limits"` + // LocalLimitCacheEnabled prevents the per-ingester local series limit from shrinking + // during ring topology changes (e.g., ingester scale-up). When enabled, the limiter + // caches the previous local limit per tenant and prevents it from decreasing when the + // global limit has not changed. The cache is reset per-tenant after head compaction. + LocalLimitCacheEnabled bool `yaml:"local_limit_cache_enabled"` + QueryProtection configs.QueryProtection `yaml:"query_protection"` } @@ -212,6 +218,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.SkipMetadataLimits, "ingester.skip-metadata-limits", true, "If enabled, the metadata API returns all metadata regardless of the limits.") f.BoolVar(&cfg.EnableMatcherOptimization, "ingester.enable-matcher-optimization", false, "Enable optimization of label matchers when query chunks. When enabled, matchers with low selectivity such as =~.+ are applied lazily during series scanning instead of being used for postings matching.") f.BoolVar(&cfg.EnableRegexMatcherLimits, "ingester.enable-regex-matcher-limits", false, "Enable regex matcher limits and metrics collection for unoptimized regex queries. When enabled, the ingester will track pattern length, label cardinality, and total value length for unoptimized regex matchers.") + f.BoolVar(&cfg.LocalLimitCacheEnabled, "ingester.local-limit-cache-enabled", false, "[Experimental] When enabled, the per-ingester local series limit is cached and prevented from shrinking during ring topology changes if the global limit has not changed. This prevents false throttling during ingester scale-up at the cost of potential temporary over-commitment until head compaction.") cfg.DefaultLimits.RegisterFlagsWithPrefix(f, "ingester.") cfg.QueryProtection.RegisterFlagsWithPrefix(f, "ingester.") } @@ -860,6 +867,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled, cfg.AdminLimitMessage, + cfg.LocalLimitCacheEnabled, ) i.TSDBState.shipperIngesterID = i.lifecycler.ID @@ -913,6 +921,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled, cfg.AdminLimitMessage, + cfg.LocalLimitCacheEnabled, ) i.metrics = newIngesterMetrics(registerer, false, @@ -3371,6 +3380,10 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *users level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err, "compactReason", reason) } else { level.Debug(logutil.WithContext(ctx, i.logger)).Log("msg", "TSDB blocks compaction completed successfully", "user", userID, "compactReason", reason) + // Reset the local limit cache after successful compaction. + // Idle series (including those resharded to other ingesters) are now + // flushed from the head, so the series count reflects true ownership. + i.limiter.ResetLocalLimitCache(userID) } return nil @@ -3471,6 +3484,7 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes i.deleteUserMetadata(userID) i.metrics.deletePerUserMetrics(userID) + i.limiter.ResetLocalLimitCache(userID) validation.DeletePerUserValidationMetrics(i.validateMetrics, userID, i.logger) diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 69920f7622..886a16b929 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -3,6 +3,7 @@ package ingester import ( "fmt" "math" + "sync" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" @@ -33,6 +34,15 @@ type RingCount interface { ZonesCount() int } +// localLimitEntry stores a previously computed local limit along with the +// global limit that was used to derive it. This is used to prevent the local +// limit from shrinking during ingester scale-up when the global limit has not +// changed. +type localLimitEntry struct { + localLimit int + globalLimit int +} + // Limiter implements primitives to get the maximum number of series // an ingester can handle for a specific tenant type Limiter struct { @@ -43,6 +53,21 @@ type Limiter struct { shardByAllLabels bool zoneAwarenessEnabled bool AdminLimitMessage string + + // localLimitCacheEnabled gates the local limit caching behavior that prevents + // false throttling during ingester scale-up. When enabled, the limiter caches + // the previous local limit per tenant and prevents it from shrinking when the + // global limit has not changed. This avoids rejecting writes for tenants who + // are within their global quota but whose local limit dropped due to ring growth. + // + // Note: enabling this may cause temporary over-commitment of the global series + // limit until head compaction redistributes series to new ingesters (~2h). + localLimitCacheEnabled bool + + // prevLocalLimits caches the previous local limit per user to prevent + // false throttling during ingester ring topology changes. + prevLocalLimitsMu sync.RWMutex + prevLocalLimits map[string]localLimitEntry } // NewLimiter makes a new in-memory series limiter @@ -54,6 +79,7 @@ func NewLimiter( replicationFactor int, zoneAwarenessEnabled bool, AdminLimitMessage string, + localLimitCacheEnabled bool, ) *Limiter { return &Limiter{ limits: limits, @@ -63,6 +89,8 @@ func NewLimiter( shardByAllLabels: shardByAllLabels, zoneAwarenessEnabled: zoneAwarenessEnabled, AdminLimitMessage: AdminLimitMessage, + localLimitCacheEnabled: localLimitCacheEnabled, + prevLocalLimits: make(map[string]localLimitEntry), } } @@ -333,7 +361,38 @@ func (l *Limiter) convertGlobalToLocalLimit(userID string, globalLimit int) int numIngesters = min(numIngesters, util.ShuffleShardExpectedInstances(shardSize, l.getNumZones())) } - return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor)) + newLimit := int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor)) + + // Prevent the local limit from shrinking when the global limit has not changed. + // During ingester scale-up, numIngesters increases before series redistribute + // (which happens at head compaction), causing the local limit to drop below the + // series count an ingester already holds. This results in false throttling for + // tenants who are within their global limit. + // + // We only cache when the global limit is unchanged — if the global limit changes + // (increase or decrease), we always recalculate to respect the new configuration. + if l.localLimitCacheEnabled { + l.prevLocalLimitsMu.RLock() + prev, ok := l.prevLocalLimits[userID] + l.prevLocalLimitsMu.RUnlock() + if ok && newLimit < prev.localLimit && globalLimit == prev.globalLimit { + return prev.localLimit + } + l.prevLocalLimitsMu.Lock() + l.prevLocalLimits[userID] = localLimitEntry{localLimit: newLimit, globalLimit: globalLimit} + l.prevLocalLimitsMu.Unlock() + } + return newLimit +} + +// ResetLocalLimitCache clears the cached local limit for a specific user. +// This should be called after the user's TSDB head compaction when series have +// been redistributed and the ingester's actual series count for this user +// reflects its true post-resharding ownership. +func (l *Limiter) ResetLocalLimitCache(userID string) { + l.prevLocalLimitsMu.Lock() + delete(l.prevLocalLimits, userID) + l.prevLocalLimitsMu.Unlock() } func (l *Limiter) getShardSize(userID string) int { diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index 17d723c21f..cabf23b10a 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -235,12 +235,12 @@ func runLimiterMaxFunctionTest( overrides := validation.NewOverrides(limits, nil) // Assert on default sharding strategy. - limiter := NewLimiter(overrides, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled, "") + limiter := NewLimiter(overrides, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled, "", false) actual := runMaxFn(limiter) assert.Equal(t, testData.expectedDefaultSharding, actual) // Assert on shuffle sharding strategy. - limiter = NewLimiter(overrides, ring, util.ShardingStrategyShuffle, testData.shardByAllLabels, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled, "") + limiter = NewLimiter(overrides, ring, util.ShardingStrategyShuffle, testData.shardByAllLabels, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled, "", false) actual = runMaxFn(limiter) assert.Equal(t, testData.expectedShuffleSharding, actual) }) @@ -300,7 +300,7 @@ func TestLimiter_AssertMaxSeriesPerMetric(t *testing.T) { MaxGlobalSeriesPerMetric: testData.maxGlobalSeriesPerMetric, }, nil) - limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "") + limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "", false) actual := limiter.AssertMaxSeriesPerMetric("test", testData.series) assert.Equal(t, testData.expected, actual) @@ -360,7 +360,7 @@ func TestLimiter_AssertMaxMetadataPerMetric(t *testing.T) { MaxGlobalMetadataPerMetric: testData.maxGlobalMetadataPerMetric, }, nil) - limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "") + limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "", false) actual := limiter.AssertMaxMetadataPerMetric("test", testData.metadata) assert.Equal(t, testData.expected, actual) @@ -421,7 +421,7 @@ func TestLimiter_AssertMaxSeriesPerUser(t *testing.T) { MaxGlobalSeriesPerUser: testData.maxGlobalSeriesPerUser, }, nil) - limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "") + limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "", false) actual := limiter.AssertMaxSeriesPerUser("test", testData.series) assert.Equal(t, testData.expected, actual) @@ -482,7 +482,7 @@ func TestLimiter_AssertMaxNativeHistogramsSeriesPerUser(t *testing.T) { MaxGlobalNativeHistogramSeriesPerUser: testData.maxGlobalNativeHistogramsSeriesPerUser, }, nil) - limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "") + limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "", false) actual := limiter.AssertMaxNativeHistogramSeriesPerUser("test", testData.series) assert.Equal(t, testData.expected, actual) @@ -562,7 +562,7 @@ func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) { // Mock limits limits := validation.NewOverrides(testData.limits, nil) - limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "") + limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "", false) actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(limits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) { return testData.series, nil }) @@ -625,7 +625,7 @@ func TestLimiter_AssertMaxMetricsWithMetadataPerUser(t *testing.T) { MaxGlobalMetricsWithMetadataPerUser: testData.maxGlobalMetadataPerUser, }, nil) - limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "") + limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "", false) actual := limiter.AssertMaxMetricsWithMetadataPerUser("test", testData.metadata) assert.Equal(t, testData.expected, actual) @@ -648,7 +648,7 @@ func TestLimiter_FormatError(t *testing.T) { MaxGlobalMetadataPerMetric: 3, }, nil) - limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, true, 3, false, "please contact administrator to raise it") + limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, true, 3, false, "please contact administrator to raise it", false) lbls := labels.FromStrings(labels.MetricName, "testMetric") actual := limiter.FormatError("user-1", errMaxSeriesPerUserLimitExceeded, lbls) @@ -727,3 +727,121 @@ func (m *ringCountMock) ZonesCount() int { args := m.Called() return args.Int(0) } + +func TestLimiter_convertGlobalToLocalLimit_CacheDuringScaleUp(t *testing.T) { + tests := map[string]struct { + initialIngesterCount int + scaledIngesterCount int + globalLimit int + replicationFactor int + expectedFirstLimit int + expectedSecondLimit int + }{ + "local limit should not shrink when global limit unchanged during scale-up": { + initialIngesterCount: 75, + scaledIngesterCount: 249, + globalLimit: 2700000, + replicationFactor: 3, + expectedFirstLimit: 108000, // 2700000 / 75 * 3 + expectedSecondLimit: 108000, // cached, not 32530 + }, + "local limit should increase when ingesters decrease (scale-down)": { + initialIngesterCount: 249, + scaledIngesterCount: 75, + globalLimit: 2700000, + replicationFactor: 3, + expectedFirstLimit: 32530, // 2700000 / 249 * 3 + expectedSecondLimit: 108000, // 2700000 / 75 * 3 (higher, so updated) + }, + "local limit should recalculate when global limit changes": { + initialIngesterCount: 75, + scaledIngesterCount: 249, + globalLimit: 5000000, // changed from initial + replicationFactor: 3, + expectedFirstLimit: 108000, // 2700000 / 75 * 3 (with initial global) + expectedSecondLimit: 60240, // 5000000 / 249 * 3 (recalculated) + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + // Setup with initial ingester count + ring := &ringCountMock{} + ring.On("HealthyInstancesCount").Return(testData.initialIngesterCount).Once() + ring.On("ZonesCount").Return(1) + + limiter := NewLimiter( + nil, ring, "", true, testData.replicationFactor, false, "", true, + ) + + // First call with initial ingester count + initialGlobal := 2700000 + firstLimit := limiter.convertGlobalToLocalLimit("test-user", initialGlobal) + assert.Equal(t, testData.expectedFirstLimit, firstLimit) + + // Scale up: change ingester count + ring2 := &ringCountMock{} + ring2.On("HealthyInstancesCount").Return(testData.scaledIngesterCount) + ring2.On("ZonesCount").Return(1) + limiter.ring = ring2 + + // Second call after scale-up + secondLimit := limiter.convertGlobalToLocalLimit("test-user", testData.globalLimit) + assert.Equal(t, testData.expectedSecondLimit, secondLimit) + }) + } +} + +func TestLimiter_convertGlobalToLocalLimit_CacheResetOnHeadCompaction(t *testing.T) { + ring := &ringCountMock{} + ring.On("HealthyInstancesCount").Return(75) + ring.On("ZonesCount").Return(1) + + limiter := NewLimiter(nil, ring, "", true, 3, false, "", true) + + // First call: establishes cache + limit1 := limiter.convertGlobalToLocalLimit("test-user", 2700000) + assert.Equal(t, 108000, limit1) // 2700000 / 75 * 3 + + // Scale up + ring2 := &ringCountMock{} + ring2.On("HealthyInstancesCount").Return(249) + ring2.On("ZonesCount").Return(1) + limiter.ring = ring2 + + // Second call: cache prevents shrinking + limit2 := limiter.convertGlobalToLocalLimit("test-user", 2700000) + assert.Equal(t, 108000, limit2) // cached + + // Another user also cached + limit2b := limiter.convertGlobalToLocalLimit("other-user", 2700000) + assert.Equal(t, 32530, limit2b) // fresh for other-user (no prior cache) + + // Simulate head compaction for test-user only + limiter.ResetLocalLimitCache("test-user") + + // test-user: cache cleared, uses new calculation + limit3 := limiter.convertGlobalToLocalLimit("test-user", 2700000) + assert.Equal(t, 32530, limit3) // 2700000 / 249 * 3 + + // other-user: unaffected by test-user's reset, cache still holds + // (other-user had no prior higher cache, so it stays at 32530) + limit3b := limiter.convertGlobalToLocalLimit("other-user", 2700000) + assert.Equal(t, 32530, limit3b) +} + +func TestLimiter_convertGlobalToLocalLimit_GlobalLimitDecrease(t *testing.T) { + ring := &ringCountMock{} + ring.On("HealthyInstancesCount").Return(100) + ring.On("ZonesCount").Return(1) + + limiter := NewLimiter(nil, ring, "", true, 3, false, "", true) + + // First call with high global limit + limit1 := limiter.convertGlobalToLocalLimit("test-user", 5000000) + assert.Equal(t, 150000, limit1) // 5000000 / 100 * 3 + + // Global limit decreases (customer downgrade) + limit2 := limiter.convertGlobalToLocalLimit("test-user", 2000000) + assert.Equal(t, 60000, limit2) // 2000000 / 100 * 3 (recalculated, not cached) +} diff --git a/pkg/ingester/user_metrics_metadata_test.go b/pkg/ingester/user_metrics_metadata_test.go index fa70e090c3..529f1a4375 100644 --- a/pkg/ingester/user_metrics_metadata_test.go +++ b/pkg/ingester/user_metrics_metadata_test.go @@ -37,7 +37,7 @@ func Test_UserMetricsMetadata(t *testing.T) { limits := validation.Limits{} overrides := validation.NewOverrides(limits, nil) - limiter := NewLimiter(overrides, nil, util.ShardingStrategyDefault, true, 1, false, "") + limiter := NewLimiter(overrides, nil, util.ShardingStrategyDefault, true, 1, false, "", false) tests := []struct { description string diff --git a/pkg/ingester/user_state_test.go b/pkg/ingester/user_state_test.go index 38be322854..96f298d66b 100644 --- a/pkg/ingester/user_state_test.go +++ b/pkg/ingester/user_state_test.go @@ -83,7 +83,7 @@ func TestMetricCounter(t *testing.T) { // We're testing code that's not dependent on sharding strategy, replication factor, etc. To simplify the test, // we use local limit only. - limiter := NewLimiter(overrides, nil, util.ShardingStrategyDefault, true, 3, false, "") + limiter := NewLimiter(overrides, nil, util.ShardingStrategyDefault, true, 3, false, "", false) mc := newMetricCounter(limiter, ignored) for i := 0; i < tc.series; i++ { diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 915c689d15..4a707bf5ab 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -4877,6 +4877,12 @@ }, "type": "object" }, + "local_limit_cache_enabled": { + "default": false, + "description": "[Experimental] When enabled, the per-ingester local series limit is cached and prevented from shrinking during ring topology changes if the global limit has not changed. This prevents false throttling during ingester scale-up at the cost of potential temporary over-commitment until head compaction.", + "type": "boolean", + "x-cli-flag": "ingester.local-limit-cache-enabled" + }, "matchers_cache_max_items": { "default": 0, "description": "Maximum number of entries in the regex matchers cache. 0 to disable.",