From 37bb3b8454860c0c348017a72bc6de7f5c37b756 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 7 May 2026 14:28:04 -0700 Subject: [PATCH 01/11] Add Query Resource Based Eviction Signed-off-by: Essam Eldaly --- docs/blocks-storage/querier.md | 37 +++ docs/blocks-storage/store-gateway.md | 37 +++ docs/configuration/config-file-reference.md | 111 ++++++++ pkg/configs/query_protection.go | 84 +++++- pkg/configs/query_protection_test.go | 86 +++++- pkg/cortex/modules.go | 7 +- pkg/querier/querier.go | 48 +++- pkg/querier/querier_eviction_test.go | 119 +++++++++ pkg/querier/querier_test.go | 24 +- pkg/ruler/ruler_test.go | 2 +- pkg/util/queryeviction/engine_wrapper.go | 157 +++++++++++ pkg/util/queryeviction/engine_wrapper_test.go | 247 ++++++++++++++++++ pkg/util/queryeviction/evictor.go | 150 +++++++++++ pkg/util/queryeviction/evictor_test.go | 233 +++++++++++++++++ pkg/util/queryeviction/registry.go | 138 ++++++++++ pkg/util/queryeviction/registry_test.go | 198 ++++++++++++++ schemas/cortex-config-schema.json | 144 ++++++++++ 17 files changed, 1790 insertions(+), 32 deletions(-) create mode 100644 pkg/querier/querier_eviction_test.go create mode 100644 pkg/util/queryeviction/engine_wrapper.go create mode 100644 pkg/util/queryeviction/engine_wrapper_test.go create mode 100644 pkg/util/queryeviction/evictor.go create mode 100644 pkg/util/queryeviction/evictor_test.go create mode 100644 pkg/util/queryeviction/registry.go create mode 100644 pkg/util/queryeviction/registry_test.go diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 24bc6a4c3a8..b633ddbc8fa 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -330,6 +330,43 @@ querier: # type. 0 to disable. # CLI flag: -querier.query-protection.rejection.threshold.heap-utilization [heap_utilization: | default = 0] + + eviction: + threshold: + # EXPERIMENTAL: Max CPU utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in + # percentage, between 0 and 1. monitored_resources config must include + # the resource type. 0 to disable. + # CLI flag: -querier.query-protection.eviction.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in + # percentage, between 0 and 1. monitored_resources config must include + # the resource type. 0 to disable. + # CLI flag: -querier.query-protection.eviction.threshold.heap-utilization + [heap_utilization: | default = 0] + + # EXPERIMENTAL: How frequently the evictor checks system resource + # utilization. + # CLI flag: -querier.query-protection.eviction.check-interval + [check_interval: | default = 1s] + + # EXPERIMENTAL: Number of check intervals to wait after an eviction before + # evicting again. + # CLI flag: -querier.query-protection.eviction.cooldown-period + [cooldown_period: | default = 3] + + # EXPERIMENTAL: The query metric used to determine the heaviest query for + # eviction. Supported values: fetched_samples, fetched_series, + # fetched_chunks, fetched_chunk_bytes. + # CLI flag: -querier.query-protection.eviction.eviction-metric + [eviction_metric: | default = "fetched_samples"] + + # EXPERIMENTAL: Minimum time a query must be running before it becomes + # eligible for eviction. Queries younger than this are ignored. + # CLI flag: -querier.query-protection.eviction.min-query-age + [min_query_age: | default = 10s] ``` ### `blocks_storage_config` diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 965a9089f2b..59250604fa9 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -372,6 +372,43 @@ store_gateway: # CLI flag: -store-gateway.query-protection.rejection.threshold.heap-utilization [heap_utilization: | default = 0] + eviction: + threshold: + # EXPERIMENTAL: Max CPU utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in + # percentage, between 0 and 1. monitored_resources config must include + # the resource type. 0 to disable. + # CLI flag: -store-gateway.query-protection.eviction.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in + # percentage, between 0 and 1. monitored_resources config must include + # the resource type. 0 to disable. + # CLI flag: -store-gateway.query-protection.eviction.threshold.heap-utilization + [heap_utilization: | default = 0] + + # EXPERIMENTAL: How frequently the evictor checks system resource + # utilization. + # CLI flag: -store-gateway.query-protection.eviction.check-interval + [check_interval: | default = 1s] + + # EXPERIMENTAL: Number of check intervals to wait after an eviction before + # evicting again. + # CLI flag: -store-gateway.query-protection.eviction.cooldown-period + [cooldown_period: | default = 3] + + # EXPERIMENTAL: The query metric used to determine the heaviest query for + # eviction. Supported values: fetched_samples, fetched_series, + # fetched_chunks, fetched_chunk_bytes. + # CLI flag: -store-gateway.query-protection.eviction.eviction-metric + [eviction_metric: | default = "fetched_samples"] + + # EXPERIMENTAL: Minimum time a query must be running before it becomes + # eligible for eviction. Queries younger than this are ignored. + # CLI flag: -store-gateway.query-protection.eviction.min-query-age + [min_query_age: | default = 10s] + hedged_request: # If true, hedged requests are applied to object store calls. It can help # with reducing tail latency. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f35e99c3276..e6ca704464a 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3872,6 +3872,43 @@ query_protection: # disable. # CLI flag: -ingester.query-protection.rejection.threshold.heap-utilization [heap_utilization: | default = 0] + + eviction: + threshold: + # EXPERIMENTAL: Max CPU utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -ingester.query-protection.eviction.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -ingester.query-protection.eviction.threshold.heap-utilization + [heap_utilization: | default = 0] + + # EXPERIMENTAL: How frequently the evictor checks system resource + # utilization. + # CLI flag: -ingester.query-protection.eviction.check-interval + [check_interval: | default = 1s] + + # EXPERIMENTAL: Number of check intervals to wait after an eviction before + # evicting again. + # CLI flag: -ingester.query-protection.eviction.cooldown-period + [cooldown_period: | default = 3] + + # EXPERIMENTAL: The query metric used to determine the heaviest query for + # eviction. Supported values: fetched_samples, fetched_series, + # fetched_chunks, fetched_chunk_bytes. + # CLI flag: -ingester.query-protection.eviction.eviction-metric + [eviction_metric: | default = "fetched_samples"] + + # EXPERIMENTAL: Minimum time a query must be running before it becomes + # eligible for eviction. Queries younger than this are ignored. + # CLI flag: -ingester.query-protection.eviction.min-query-age + [min_query_age: | default = 10s] ``` ### `ingester_client_config` @@ -5016,6 +5053,43 @@ query_protection: # disable. # CLI flag: -querier.query-protection.rejection.threshold.heap-utilization [heap_utilization: | default = 0] + + eviction: + threshold: + # EXPERIMENTAL: Max CPU utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -querier.query-protection.eviction.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -querier.query-protection.eviction.threshold.heap-utilization + [heap_utilization: | default = 0] + + # EXPERIMENTAL: How frequently the evictor checks system resource + # utilization. + # CLI flag: -querier.query-protection.eviction.check-interval + [check_interval: | default = 1s] + + # EXPERIMENTAL: Number of check intervals to wait after an eviction before + # evicting again. + # CLI flag: -querier.query-protection.eviction.cooldown-period + [cooldown_period: | default = 3] + + # EXPERIMENTAL: The query metric used to determine the heaviest query for + # eviction. Supported values: fetched_samples, fetched_series, + # fetched_chunks, fetched_chunk_bytes. + # CLI flag: -querier.query-protection.eviction.eviction-metric + [eviction_metric: | default = "fetched_samples"] + + # EXPERIMENTAL: Minimum time a query must be running before it becomes + # eligible for eviction. Queries younger than this are ignored. + # CLI flag: -querier.query-protection.eviction.min-query-age + [min_query_age: | default = 10s] ``` ### `query_frontend_config` @@ -6785,6 +6859,43 @@ query_protection: # CLI flag: -store-gateway.query-protection.rejection.threshold.heap-utilization [heap_utilization: | default = 0] + eviction: + threshold: + # EXPERIMENTAL: Max CPU utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -store-gateway.query-protection.eviction.threshold.cpu-utilization + [cpu_utilization: | default = 0] + + # EXPERIMENTAL: Max heap utilization that this instance can reach before + # evicting the heaviest running query (across all tenants) in percentage, + # between 0 and 1. monitored_resources config must include the resource + # type. 0 to disable. + # CLI flag: -store-gateway.query-protection.eviction.threshold.heap-utilization + [heap_utilization: | default = 0] + + # EXPERIMENTAL: How frequently the evictor checks system resource + # utilization. + # CLI flag: -store-gateway.query-protection.eviction.check-interval + [check_interval: | default = 1s] + + # EXPERIMENTAL: Number of check intervals to wait after an eviction before + # evicting again. + # CLI flag: -store-gateway.query-protection.eviction.cooldown-period + [cooldown_period: | default = 3] + + # EXPERIMENTAL: The query metric used to determine the heaviest query for + # eviction. Supported values: fetched_samples, fetched_series, + # fetched_chunks, fetched_chunk_bytes. + # CLI flag: -store-gateway.query-protection.eviction.eviction-metric + [eviction_metric: | default = "fetched_samples"] + + # EXPERIMENTAL: Minimum time a query must be running before it becomes + # eligible for eviction. Queries younger than this are ignored. + # CLI flag: -store-gateway.query-protection.eviction.min-query-age + [min_query_age: | default = 10s] + hedged_request: # If true, hedged requests are applied to object store calls. It can help with # reducing tail latency. diff --git a/pkg/configs/query_protection.go b/pkg/configs/query_protection.go index 7b48e9b2def..e2b1aa2b1a5 100644 --- a/pkg/configs/query_protection.go +++ b/pkg/configs/query_protection.go @@ -3,47 +3,115 @@ package configs import ( "errors" "flag" + "fmt" "strings" + "time" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/resource" ) +// recognizedEvictionMetrics lists the valid values for eviction_metric. +var recognizedEvictionMetrics = map[string]bool{ + "fetched_samples": true, + "fetched_series": true, + "fetched_chunks": true, + "fetched_chunk_bytes": true, +} + type QueryProtection struct { - Rejection rejection `json:"rejection"` + Rejection rejection `json:"rejection"` + Eviction EvictionConfig `yaml:"eviction"` } type rejection struct { - Threshold threshold `yaml:"threshold"` + Threshold Threshold `yaml:"threshold"` } -type threshold struct { +// Threshold holds CPU and heap utilization thresholds (0-1 range). +type Threshold struct { CPUUtilization float64 `yaml:"cpu_utilization"` HeapUtilization float64 `yaml:"heap_utilization"` } +// EvictionConfig configures the resource-based query evictor. +type EvictionConfig struct { + Threshold Threshold `yaml:"threshold"` + CheckInterval time.Duration `yaml:"check_interval"` + CooldownPeriod int `yaml:"cooldown_period"` + EvictionMetric string `yaml:"eviction_metric"` + MinQueryAge time.Duration `yaml:"min_query_age"` +} + +// Enabled returns true when at least one eviction threshold is greater than 0. +func (c EvictionConfig) Enabled() bool { + return c.Threshold.CPUUtilization > 0 || c.Threshold.HeapUtilization > 0 +} + func (cfg *QueryProtection) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + // Rejection flags f.Float64Var(&cfg.Rejection.Threshold.CPUUtilization, prefix+"query-protection.rejection.threshold.cpu-utilization", 0, "EXPERIMENTAL: Max CPU utilization that this instance can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") f.Float64Var(&cfg.Rejection.Threshold.HeapUtilization, prefix+"query-protection.rejection.threshold.heap-utilization", 0, "EXPERIMENTAL: Max heap utilization that this instance can reach before rejecting new query request (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") + + // Eviction flags + f.Float64Var(&cfg.Eviction.Threshold.CPUUtilization, prefix+"query-protection.eviction.threshold.cpu-utilization", 0, "EXPERIMENTAL: Max CPU utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") + f.Float64Var(&cfg.Eviction.Threshold.HeapUtilization, prefix+"query-protection.eviction.threshold.heap-utilization", 0, "EXPERIMENTAL: Max heap utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.") + f.DurationVar(&cfg.Eviction.CheckInterval, prefix+"query-protection.eviction.check-interval", 1*time.Second, "EXPERIMENTAL: How frequently the evictor checks system resource utilization.") + f.IntVar(&cfg.Eviction.CooldownPeriod, prefix+"query-protection.eviction.cooldown-period", 3, "EXPERIMENTAL: Number of check intervals to wait after an eviction before evicting again.") + f.StringVar(&cfg.Eviction.EvictionMetric, prefix+"query-protection.eviction.eviction-metric", "fetched_samples", "EXPERIMENTAL: The query metric used to determine the heaviest query for eviction. Supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes.") + f.DurationVar(&cfg.Eviction.MinQueryAge, prefix+"query-protection.eviction.min-query-age", 10*time.Second, "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.") } func (cfg *QueryProtection) Validate(monitoredResources flagext.StringSliceCSV) error { - thresholdCfg := cfg.Rejection.Threshold - if thresholdCfg.CPUUtilization > 1 || thresholdCfg.CPUUtilization < 0 { + // Validate rejection thresholds + rejThreshold := cfg.Rejection.Threshold + if rejThreshold.CPUUtilization > 1 || rejThreshold.CPUUtilization < 0 { return errors.New("cpu_utilization must be between 0 and 1") } - if thresholdCfg.CPUUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.CPU)) { + if rejThreshold.CPUUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.CPU)) { return errors.New("monitored_resources config must include \"cpu\" as well") } - if thresholdCfg.HeapUtilization > 1 || thresholdCfg.HeapUtilization < 0 { + if rejThreshold.HeapUtilization > 1 || rejThreshold.HeapUtilization < 0 { return errors.New("heap_utilization must be between 0 and 1") } - if thresholdCfg.HeapUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.Heap)) { + if rejThreshold.HeapUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.Heap)) { return errors.New("monitored_resources config must include \"heap\" as well") } + // Validate eviction thresholds + evThreshold := cfg.Eviction.Threshold + if evThreshold.CPUUtilization > 1 || evThreshold.CPUUtilization < 0 { + return errors.New("eviction cpu_utilization must be between 0 and 1") + } + + if evThreshold.HeapUtilization > 1 || evThreshold.HeapUtilization < 0 { + return errors.New("eviction heap_utilization must be between 0 and 1") + } + + if cfg.Eviction.Enabled() { + if cfg.Eviction.CheckInterval <= 0 { + return errors.New("eviction check_interval must be greater than 0 when eviction is enabled") + } + + if cfg.Eviction.CooldownPeriod < 0 { + return errors.New("eviction cooldown_period must be >= 0") + } + + if !recognizedEvictionMetrics[cfg.Eviction.EvictionMetric] { + return fmt.Errorf("unrecognized eviction_metric %q; supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes", cfg.Eviction.EvictionMetric) + } + + if evThreshold.CPUUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.CPU)) { + return errors.New("monitored_resources config must include \"cpu\" when eviction cpu threshold is set") + } + + if evThreshold.HeapUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.Heap)) { + return errors.New("monitored_resources config must include \"heap\" when eviction heap threshold is set") + } + } + return nil } diff --git a/pkg/configs/query_protection_test.go b/pkg/configs/query_protection_test.go index f06b4be7f38..ce92399208c 100644 --- a/pkg/configs/query_protection_test.go +++ b/pkg/configs/query_protection_test.go @@ -2,9 +2,14 @@ package configs import ( "errors" + "flag" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/util/flagext" ) func Test_Validate(t *testing.T) { @@ -16,7 +21,7 @@ func Test_Validate(t *testing.T) { "correct config should pass validation": { queryProtection: QueryProtection{ Rejection: rejection{ - Threshold: threshold{ + Threshold: Threshold{ CPUUtilization: 0.5, HeapUtilization: 0.5, }, @@ -28,7 +33,7 @@ func Test_Validate(t *testing.T) { "utilization config less than 0 should fail validation": { queryProtection: QueryProtection{ Rejection: rejection{ - Threshold: threshold{ + Threshold: Threshold{ CPUUtilization: -0.5, HeapUtilization: 0.5, }, @@ -40,7 +45,7 @@ func Test_Validate(t *testing.T) { "utilization config greater than 1 should fail validation": { queryProtection: QueryProtection{ Rejection: rejection{ - Threshold: threshold{ + Threshold: Threshold{ CPUUtilization: 0.5, HeapUtilization: 1.5, }, @@ -52,7 +57,7 @@ func Test_Validate(t *testing.T) { "missing cpu in monitored_resources config should fail validation": { queryProtection: QueryProtection{ Rejection: rejection{ - Threshold: threshold{ + Threshold: Threshold{ CPUUtilization: 0.5, }, }, @@ -63,7 +68,7 @@ func Test_Validate(t *testing.T) { "missing heap in monitored_resources config should fail validation": { queryProtection: QueryProtection{ Rejection: rejection{ - Threshold: threshold{ + Threshold: Threshold{ HeapUtilization: 0.5, }, }, @@ -73,7 +78,7 @@ func Test_Validate(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - err := tc.queryProtection.Validate(tc.monitoredResources) + err := tc.queryProtection.Validate(flagext.StringSliceCSV(tc.monitoredResources)) if tc.err != nil { require.Errorf(t, err, tc.err.Error()) } else { @@ -82,3 +87,72 @@ func Test_Validate(t *testing.T) { }) } } + +func Test_EvictionConfig_Enabled(t *testing.T) { + assert.False(t, EvictionConfig{}.Enabled()) + assert.True(t, EvictionConfig{Threshold: Threshold{CPUUtilization: 0.8}}.Enabled()) + assert.True(t, EvictionConfig{Threshold: Threshold{HeapUtilization: 0.85}}.Enabled()) + assert.True(t, EvictionConfig{Threshold: Threshold{CPUUtilization: 0.8, HeapUtilization: 0.85}}.Enabled()) +} + +func Test_EvictionConfig_Validation(t *testing.T) { + validBase := func() QueryProtection { + return QueryProtection{ + Eviction: EvictionConfig{ + Threshold: Threshold{CPUUtilization: 0.8, HeapUtilization: 0.85}, + CheckInterval: 1 * time.Second, + CooldownPeriod: 3, + EvictionMetric: "fetched_samples", + }, + } + } + + tests := map[string]struct { + modify func(*QueryProtection) + monitoredResources []string + err string + }{ + "valid config passes": {func(qp *QueryProtection) {}, []string{"cpu", "heap"}, ""}, + "cpu > 1 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.CPUUtilization = 1.5 }, []string{"cpu", "heap"}, "eviction cpu_utilization must be between 0 and 1"}, + "cpu < 0 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.CPUUtilization = -0.1 }, []string{"cpu", "heap"}, "eviction cpu_utilization must be between 0 and 1"}, + "heap > 1 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.HeapUtilization = 2.0 }, []string{"cpu", "heap"}, "eviction heap_utilization must be between 0 and 1"}, + "heap < 0 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.HeapUtilization = -0.5 }, []string{"cpu", "heap"}, "eviction heap_utilization must be between 0 and 1"}, + "check_interval 0 fails": {func(qp *QueryProtection) { qp.Eviction.CheckInterval = 0 }, []string{"cpu", "heap"}, "eviction check_interval must be greater than 0 when eviction is enabled"}, + "cooldown < 0 fails": {func(qp *QueryProtection) { qp.Eviction.CooldownPeriod = -1 }, []string{"cpu", "heap"}, "eviction cooldown_period must be >= 0"}, + "unknown metric fails": {func(qp *QueryProtection) { qp.Eviction.EvictionMetric = "unknown" }, []string{"cpu", "heap"}, `unrecognized eviction_metric "unknown"; supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes`}, + "cpu without monitored fails": {func(qp *QueryProtection) {}, []string{"heap"}, `monitored_resources config must include "cpu" when eviction cpu threshold is set`}, + "heap without monitored fails": {func(qp *QueryProtection) {}, []string{"cpu"}, `monitored_resources config must include "heap" when eviction heap threshold is set`}, + "cooldown 0 is valid": {func(qp *QueryProtection) { qp.Eviction.CooldownPeriod = 0 }, []string{"cpu", "heap"}, ""}, + "disabled skips interval check": {func(qp *QueryProtection) { + qp.Eviction.Threshold = Threshold{} + qp.Eviction.CheckInterval = 0 + }, []string{}, ""}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + qp := validBase() + tc.modify(&qp) + err := qp.Validate(flagext.StringSliceCSV(tc.monitoredResources)) + if tc.err != "" { + require.EqualError(t, err, tc.err) + } else { + require.NoError(t, err) + } + }) + } +} + +func Test_RegisterFlagsWithPrefix_EvictionDefaults(t *testing.T) { + var cfg QueryProtection + fs := flag.NewFlagSet("test", flag.ContinueOnError) + cfg.RegisterFlagsWithPrefix(fs, "querier.") + require.NoError(t, fs.Parse([]string{})) + + assert.Equal(t, float64(0), cfg.Eviction.Threshold.CPUUtilization) + assert.Equal(t, float64(0), cfg.Eviction.Threshold.HeapUtilization) + assert.Equal(t, 1*time.Second, cfg.Eviction.CheckInterval) + assert.Equal(t, 3, cfg.Eviction.CooldownPeriod) + assert.Equal(t, "fetched_samples", cfg.Eviction.EvictionMetric) + assert.False(t, cfg.Eviction.Enabled()) +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index f24d2db0006..fd6d9c7389f 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -284,7 +284,8 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) { querierRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "querier"}, prometheus.DefaultRegisterer) // Create a querier queryable and PromQL engine - t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.OverridesConfig.QueryPartialData, t.ResourceMonitor) + var evictorService services.Service + t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine, evictorService = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger, t.OverridesConfig.QueryPartialData, t.ResourceMonitor) // Use distributor as default MetadataQuerier t.MetadataQuerier = t.Distributor @@ -292,7 +293,7 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) { // Register the default endpoints that are always enabled for the querier module t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor) - return nil, nil + return evictorService, nil } // Enable merge querier if multi tenant query federation is enabled @@ -701,7 +702,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { queryEngine = engine.New(opts, t.Cfg.Ruler.ThanosEngine, rulerRegisterer) } else { // TODO: Consider wrapping logger to differentiate from querier module logger - queryable, _, queryEngine = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.OverridesConfig.RulesPartialData, nil) + queryable, _, queryEngine, _ = querier.New(t.Cfg.Querier, t.OverridesConfig, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger, t.OverridesConfig.RulesPartialData, nil) } managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, pusher, queryable, queryEngine, t.OverridesConfig, metrics, prometheus.DefaultRegisterer) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 8c701283bc1..064a8ca309f 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -33,7 +33,9 @@ import ( "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/parquetutil" + "github.com/cortexproject/cortex/pkg/util/queryeviction" "github.com/cortexproject/cortex/pkg/util/resource" + "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" @@ -233,7 +235,7 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc { } // New builds a queryable and promql engine. -func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc, resourceMonitor resource.IMonitor) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) { +func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc, resourceMonitor resource.IMonitor) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine, services.Service) { iteratorFunc := getChunksIteratorFunction(cfg) // Create resource-based limiter if resource monitor is available and thresholds are configured. @@ -255,6 +257,36 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor } } + // Set up query eviction if configured. + var queryRegistry *queryeviction.QueryRegistry + var queryEvictor *queryeviction.QueryEvictor + + evictionCfg := cfg.QueryProtection.Eviction + if evictionCfg.Enabled() && resourceMonitor != nil { + evCfg := queryeviction.EvictionConfig{ + CPUUtilization: evictionCfg.Threshold.CPUUtilization, + HeapUtilization: evictionCfg.Threshold.HeapUtilization, + CheckInterval: evictionCfg.CheckInterval, + CooldownPeriod: evictionCfg.CooldownPeriod, + EvictionMetric: evictionCfg.EvictionMetric, + MinQueryAge: evictionCfg.MinQueryAge, + } + + metricFunc, err := queryeviction.ResolveMetricFunc(evCfg.EvictionMetric) + if err != nil { + level.Error(logger).Log("msg", "invalid eviction metric", "err", err) + } else { + queryRegistry = queryeviction.NewQueryRegistry(metricFunc) + queryEvictor, err = queryeviction.NewQueryEvictor( + resourceMonitor, queryRegistry, evCfg, + logger, reg, "querier", + ) + if err != nil { + level.Error(logger).Log("msg", "failed to create query evictor", "err", err) + } + } + } + distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts, limits, nil) ns := make([]QueryableWithFilter, len(stores)) @@ -298,7 +330,19 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor }, } queryEngine := engine.New(opts, cfg.ThanosEngine, reg) - return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, queryEngine + + // Wrap the engine with eviction support if the registry was created. + var eng engine.QueryEngine = queryEngine + if queryRegistry != nil { + eng = queryeviction.NewResourceEvictingEngine(queryEngine, queryRegistry) + } + + // Return the evictor as a service so the caller can manage its lifecycle. + var evictorService services.Service + if queryEvictor != nil { + evictorService = queryEvictor + } + return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, eng, evictorService } // NewSampleAndChunkQueryable creates a SampleAndChunkQueryable from a diff --git a/pkg/querier/querier_eviction_test.go b/pkg/querier/querier_eviction_test.go new file mode 100644 index 00000000000..8e643ba8fd5 --- /dev/null +++ b/pkg/querier/querier_eviction_test.go @@ -0,0 +1,119 @@ +package querier + +import ( + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/cortexproject/cortex/pkg/configs" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/queryeviction" + "github.com/cortexproject/cortex/pkg/util/resource" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +// simpleMonitor implements resource.IMonitor for testing. +type simpleMonitor struct{} + +func (m *simpleMonitor) GetCPUUtilization() float64 { return 0.5 } +func (m *simpleMonitor) GetHeapUtilization() float64 { return 0.5 } + +// Compile-time check that simpleMonitor implements resource.IMonitor. +var _ resource.IMonitor = (*simpleMonitor)(nil) + +func TestQuerier_EvictionIntegration(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + evictionCPU float64 + evictionHeap float64 + resourceMonitor resource.IMonitor + expectWrapped bool + expectService bool + }{ + "engine wrapped when eviction enabled and resourceMonitor provided": { + evictionCPU: 0.85, + evictionHeap: 0.85, + resourceMonitor: &simpleMonitor{}, + expectWrapped: true, + expectService: true, + }, + "engine not wrapped when eviction disabled (both thresholds 0)": { + evictionCPU: 0, + evictionHeap: 0, + resourceMonitor: &simpleMonitor{}, + expectWrapped: false, + expectService: false, + }, + "engine not wrapped when resourceMonitor is nil": { + evictionCPU: 0.85, + evictionHeap: 0.85, + resourceMonitor: nil, + expectWrapped: false, + expectService: false, + }, + "engine wrapped with CPU-only threshold": { + evictionCPU: 0.9, + evictionHeap: 0, + resourceMonitor: &simpleMonitor{}, + expectWrapped: true, + expectService: true, + }, + "engine wrapped with heap-only threshold": { + evictionCPU: 0, + evictionHeap: 0.9, + resourceMonitor: &simpleMonitor{}, + expectWrapped: true, + expectService: true, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + var cfg Config + flagext.DefaultValues(&cfg) + // Disable active query tracker to avoid mmap error in tests. + cfg.ActiveQueryTrackerDir = "" + + cfg.QueryProtection = configs.QueryProtection{ + Eviction: configs.EvictionConfig{ + Threshold: configs.Threshold{ + CPUUtilization: tc.evictionCPU, + HeapUtilization: tc.evictionHeap, + }, + CheckInterval: 1 * time.Second, + CooldownPeriod: 3, + EvictionMetric: "fetched_samples", + }, + } + + overrides := validation.NewOverrides(DefaultLimitsConfig(), nil) + + distributor := &MockDistributor{} + distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&client.QueryStreamResponse{}, nil) + + queryables := []QueryableWithFilter{} + + _, _, eng, svc := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, tc.resourceMonitor) + + if tc.expectWrapped { + _, ok := eng.(*queryeviction.ResourceEvictingEngine) + assert.True(t, ok, "expected engine to be *queryeviction.ResourceEvictingEngine") + } else { + _, ok := eng.(*queryeviction.ResourceEvictingEngine) + assert.False(t, ok, "expected engine NOT to be *queryeviction.ResourceEvictingEngine") + } + + if tc.expectService { + assert.NotNil(t, svc, "expected evictor service to be non-nil") + } else { + assert.Nil(t, svc, "expected evictor service to be nil") + } + }) + } +} diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index a072abc2221..f062eeaac21 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -587,7 +587,7 @@ func TestQuerier(t *testing.T) { overrides := validation.NewOverrides(DefaultLimitsConfig(), nil) queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) testRangeQuery(t, queryable, queryEngine, through, query, enc) }) } @@ -691,7 +691,7 @@ func TestNoHistoricalQueryToIngester(t *testing.T) { overrides := validation.NewOverrides(limits, nil) ctx := user.InjectOrgID(context.Background(), "0") - queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil, nil) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) require.NoError(t, err) @@ -784,7 +784,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "0") queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, "dummy", c.queryStartTime, c.queryEndTime, time.Minute) require.NoError(t, err) @@ -876,7 +876,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) queryEngine := promql.NewEngine(opts) ctx := user.InjectOrgID(context.Background(), "test") @@ -914,7 +914,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Series(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) ctx := user.InjectOrgID(context.Background(), "test") now := time.Now() @@ -972,7 +972,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Labels(t *testing.T) { distributor := &emptyDistributor{} queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) ctx := user.InjectOrgID(context.Background(), "test") @@ -1122,7 +1122,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor := &MockDistributor{} distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, testData.query, testData.queryStartTime, testData.queryEndTime, time.Minute) require.NoError(t, err) @@ -1150,7 +1150,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]labels.Labels{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1191,7 +1191,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("LabelNames", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) distributor.On("LabelNamesStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1219,7 +1219,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything, matchers).Return([]labels.Labels{}, nil) distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, matchers).Return([]labels.Labels{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1246,7 +1246,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { distributor.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) distributor.On("LabelValuesForLabelNameStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger(), nil, nil) q, err := queryable.Querier(util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime)) require.NoError(t, err) @@ -1583,7 +1583,7 @@ func TestShortTermQueryToLTS(t *testing.T) { limits.QueryStoreAfter = model.Duration(c.queryStoreAfter) overrides := validation.NewOverrides(limits, nil) - queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil, nil) + queryable, _, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(chunkStore))}, nil, log.NewNopLogger(), nil, nil) ctx := user.InjectOrgID(context.Background(), "0") query, err := engine.NewRangeQuery(ctx, queryable, nil, "dummy", c.mint, c.maxt, 1*time.Minute) require.NoError(t, err) diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 66c43ac46cb..9c791edeae0 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -242,7 +242,7 @@ func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Reg querierTestConfig.Cfg.ActiveQueryTrackerDir = "" overrides := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) - q, _, _ := querier.New(querierTestConfig.Cfg, overrides, querierTestConfig.Distributor, querierTestConfig.Stores, reg, logger, nil, nil) + q, _, _, _ := querier.New(querierTestConfig.Cfg, overrides, querierTestConfig.Distributor, querierTestConfig.Stores, reg, logger, nil, nil) return func(mint, maxt int64) (storage.Querier, error) { return q.Querier(mint, maxt) } diff --git a/pkg/util/queryeviction/engine_wrapper.go b/pkg/util/queryeviction/engine_wrapper.go new file mode 100644 index 00000000000..d6342b2732a --- /dev/null +++ b/pkg/util/queryeviction/engine_wrapper.go @@ -0,0 +1,157 @@ +package queryeviction + +import ( + "context" + "time" + + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/stats" + "github.com/thanos-io/promql-engine/logicalplan" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/engine" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/util/requestmeta" +) + +// Compile-time check that ResourceEvictingEngine implements engine.QueryEngine. +var _ engine.QueryEngine = (*ResourceEvictingEngine)(nil) + +// ResourceEvictingEngine wraps a QueryEngine to register running queries +// with a QueryRegistry, enabling resource-based eviction. +type ResourceEvictingEngine struct { + inner engine.QueryEngine + registry *QueryRegistry +} + +// NewResourceEvictingEngine wraps the given engine. +// If registry is nil, the wrapper is a no-op passthrough. +func NewResourceEvictingEngine(inner engine.QueryEngine, registry *QueryRegistry) *ResourceEvictingEngine { + return &ResourceEvictingEngine{ + inner: inner, + registry: registry, + } +} + +func (e *ResourceEvictingEngine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { + query, err := e.inner.NewInstantQuery(ctx, q, opts, qs, ts) + if err != nil { + return nil, err + } + if e.registry == nil { + return query, nil + } + return e.wrapQuery(ctx, query, qs), nil +} + +func (e *ResourceEvictingEngine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { + query, err := e.inner.NewRangeQuery(ctx, q, opts, qs, start, end, interval) + if err != nil { + return nil, err + } + if e.registry == nil { + return query, nil + } + return e.wrapQuery(ctx, query, qs), nil +} + +func (e *ResourceEvictingEngine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, ts time.Time, qs string) (promql.Query, error) { + query, err := e.inner.MakeInstantQueryFromPlan(ctx, q, opts, root, ts, qs) + if err != nil { + return nil, err + } + if e.registry == nil { + return query, nil + } + return e.wrapQuery(ctx, query, qs), nil +} + +func (e *ResourceEvictingEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, start time.Time, end time.Time, interval time.Duration, qs string) (promql.Query, error) { + query, err := e.inner.MakeRangeQueryFromPlan(ctx, q, opts, root, start, end, interval, qs) + if err != nil { + return nil, err + } + if e.registry == nil { + return query, nil + } + return e.wrapQuery(ctx, query, qs), nil +} + +// wrapQuery creates a trackedQuery that registers/deregisters with the registry. +// It creates a cancellable child context so the evictor can cancel individual queries. +func (e *ResourceEvictingEngine) wrapQuery(ctx context.Context, inner promql.Query, queryExpr string) *trackedQuery { + childCtx, cancel := context.WithCancel(ctx) + + //lint:ignore faillint wrapper around upstream method + userID, _ := user.ExtractOrgID(ctx) + + return &trackedQuery{ + inner: inner, + registry: e.registry, + queryExpr: queryExpr, + userID: userID, + requestID: requestmeta.RequestIdFromContext(ctx), + cancel: cancel, + ctx: childCtx, + } +} + +// trackedQuery wraps a promql.Query to register/deregister with the registry. +type trackedQuery struct { + inner promql.Query + registry *QueryRegistry + queryID uint64 + queryExpr string + userID string + requestID string + cancel context.CancelFunc + ctx context.Context // cancellable child context +} + +// Exec registers the query, executes it with a cancellable context, +// then deregisters on completion. If the query was evicted (child context +// cancelled by evictor but parent context not cancelled), wraps the error +// as ErrQueryEvicted. +func (q *trackedQuery) Exec(ctx context.Context) *promql.Result { + queryStats := querier_stats.FromContext(q.ctx) + q.queryID = q.registry.Register(q.cancel, queryStats, q.queryExpr, q.userID, q.requestID) + defer q.registry.Deregister(q.queryID) + + result := q.inner.Exec(q.ctx) + + // Detect eviction: child context cancelled but parent context is still active. + if result.Err != nil && q.ctx.Err() != nil && ctx.Err() == nil { + return &promql.Result{ + Err: promql.ErrStorage{Err: &ErrQueryEvicted{}}, + } + } + + return result +} + +// Statement delegates to the inner query. +func (q *trackedQuery) Statement() parser.Statement { + return q.inner.Statement() +} + +// Stats delegates to the inner query. +func (q *trackedQuery) Stats() *stats.Statistics { + return q.inner.Stats() +} + +// Close delegates to the inner query. +func (q *trackedQuery) Close() { + q.inner.Close() +} + +// Cancel cancels the tracked query's child context. +func (q *trackedQuery) Cancel() { + q.cancel() +} + +// String delegates to the inner query. +func (q *trackedQuery) String() string { + return q.inner.String() +} diff --git a/pkg/util/queryeviction/engine_wrapper_test.go b/pkg/util/queryeviction/engine_wrapper_test.go new file mode 100644 index 00000000000..884cf1349b0 --- /dev/null +++ b/pkg/util/queryeviction/engine_wrapper_test.go @@ -0,0 +1,247 @@ +package queryeviction + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/util/stats" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/logicalplan" + + "github.com/cortexproject/cortex/pkg/engine" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" +) + +// Compile-time check that mockEngine implements engine.QueryEngine. +var _ engine.QueryEngine = (*mockEngine)(nil) + +// mockEngine is a minimal implementation of engine.QueryEngine for testing. +type mockEngine struct { + query promql.Query // the query to return from all methods + err error // optional error to return +} + +func (m *mockEngine) NewInstantQuery(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, _ string, _ time.Time) (promql.Query, error) { + return m.query, m.err +} + +func (m *mockEngine) NewRangeQuery(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, _ string, _, _ time.Time, _ time.Duration) (promql.Query, error) { + return m.query, m.err +} + +func (m *mockEngine) MakeInstantQueryFromPlan(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, _ logicalplan.Node, _ time.Time, _ string) (promql.Query, error) { + return m.query, m.err +} + +func (m *mockEngine) MakeRangeQueryFromPlan(_ context.Context, _ storage.Queryable, _ promql.QueryOpts, _ logicalplan.Node, _, _ time.Time, _ time.Duration, _ string) (promql.Query, error) { + return m.query, m.err +} + +// mockQuery is a minimal implementation of promql.Query for testing. +type mockQuery struct { + execResult *promql.Result + execFunc func(ctx context.Context) *promql.Result // optional custom exec + closed bool +} + +func (q *mockQuery) Exec(ctx context.Context) *promql.Result { + if q.execFunc != nil { + return q.execFunc(ctx) + } + return q.execResult +} + +func (q *mockQuery) Close() { + q.closed = true +} + +func (q *mockQuery) Statement() parser.Statement { + return nil +} + +func (q *mockQuery) Stats() *stats.Statistics { + return nil +} + +func (q *mockQuery) Cancel() {} + +func (q *mockQuery) String() string { + return "mock_query" +} + +// ctxWithStats returns a context that has QueryStats initialized. +func ctxWithStats(ctx context.Context) context.Context { + _, ctx = querier_stats.ContextWithEmptyStats(ctx) + return ctx +} + +func TestEngineWrapper_RegisterAndDeregisterDuringExec(t *testing.T) { + registry := NewQueryRegistry(testMetricFunc) + var registeredLen int + + mq := &mockQuery{ + execFunc: func(ctx context.Context) *promql.Result { + // During exec, the query should be registered. + registeredLen = registry.Len() + return &promql.Result{} + }, + } + + inner := &mockEngine{query: mq} + wrapper := NewResourceEvictingEngine(inner, registry) + + ctx := ctxWithStats(context.Background()) + query, err := wrapper.NewInstantQuery(ctx, nil, nil, "up", time.Now()) + require.NoError(t, err) + + // Before exec, registry should be empty. + assert.Equal(t, 0, registry.Len()) + + _ = query.Exec(ctx) + + // During exec, the query was registered. + assert.Equal(t, 1, registeredLen, "query should be registered during Exec") + + // After exec, the query should be deregistered. + assert.Equal(t, 0, registry.Len(), "query should be deregistered after Exec") +} + +func TestEngineWrapper_EvictedQueryReturnsErrQueryEvicted(t *testing.T) { + registry := NewQueryRegistry(testMetricFunc) + + mq := &mockQuery{ + execFunc: func(ctx context.Context) *promql.Result { + // Simulate eviction: find the registered query and cancel it. + heaviest := registry.FindHeaviest(0) + require.NotNil(t, heaviest, "query should be registered during Exec") + heaviest.Cancel() // This cancels the child context, simulating evictor behavior. + + // The inner query would see a cancelled context and return an error. + return &promql.Result{Err: context.Canceled} + }, + } + + inner := &mockEngine{query: mq} + wrapper := NewResourceEvictingEngine(inner, registry) + + ctx := ctxWithStats(context.Background()) + query, err := wrapper.NewInstantQuery(ctx, nil, nil, "up", time.Now()) + require.NoError(t, err) + + result := query.Exec(ctx) + + // The result should contain ErrQueryEvicted wrapped in ErrStorage for 500 status. + require.NotNil(t, result.Err) + var storageErr promql.ErrStorage + require.ErrorAs(t, result.Err, &storageErr, "error should be promql.ErrStorage, got: %v", result.Err) + var evictedErr *ErrQueryEvicted + assert.True(t, errors.As(storageErr.Err, &evictedErr), "inner error should be ErrQueryEvicted, got: %v", storageErr.Err) +} + +func TestEngineWrapper_NonEvictedQueryReturnsNormalResult(t *testing.T) { + registry := NewQueryRegistry(testMetricFunc) + + expectedResult := &promql.Result{ + Value: promql.Scalar{T: 1000, V: 42.0}, + } + + mq := &mockQuery{execResult: expectedResult} + inner := &mockEngine{query: mq} + wrapper := NewResourceEvictingEngine(inner, registry) + + ctx := ctxWithStats(context.Background()) + query, err := wrapper.NewInstantQuery(ctx, nil, nil, "up", time.Now()) + require.NoError(t, err) + + result := query.Exec(ctx) + + // The result should be passed through unchanged. + assert.NoError(t, result.Err) + assert.Equal(t, expectedResult.Value, result.Value, "result value should be passed through unchanged") +} + +func TestEngineWrapper_NilRegistryIsNoOpPassthrough(t *testing.T) { + mq := &mockQuery{ + execResult: &promql.Result{Value: promql.Scalar{T: 1000, V: 42.0}}, + } + inner := &mockEngine{query: mq} + + // Create wrapper with nil registry — should be a no-op passthrough. + wrapper := NewResourceEvictingEngine(inner, nil) + + ctx := context.Background() + + // Test all four engine methods return the inner query directly (not wrapped). + instantQuery, err := wrapper.NewInstantQuery(ctx, nil, nil, "up", time.Now()) + require.NoError(t, err) + assert.Equal(t, mq, instantQuery, "nil registry should return inner query directly for NewInstantQuery") + + rangeQuery, err := wrapper.NewRangeQuery(ctx, nil, nil, "up", time.Now(), time.Now(), time.Minute) + require.NoError(t, err) + assert.Equal(t, mq, rangeQuery, "nil registry should return inner query directly for NewRangeQuery") + + instantPlanQuery, err := wrapper.MakeInstantQueryFromPlan(ctx, nil, nil, nil, time.Now(), "up") + require.NoError(t, err) + assert.Equal(t, mq, instantPlanQuery, "nil registry should return inner query directly for MakeInstantQueryFromPlan") + + rangePlanQuery, err := wrapper.MakeRangeQueryFromPlan(ctx, nil, nil, nil, time.Now(), time.Now(), time.Minute, "up") + require.NoError(t, err) + assert.Equal(t, mq, rangePlanQuery, "nil registry should return inner query directly for MakeRangeQueryFromPlan") +} + +func TestEngineWrapper_CloseCallsInnerClose(t *testing.T) { + registry := NewQueryRegistry(testMetricFunc) + + mq := &mockQuery{ + execResult: &promql.Result{}, + } + inner := &mockEngine{query: mq} + wrapper := NewResourceEvictingEngine(inner, registry) + + ctx := ctxWithStats(context.Background()) + query, err := wrapper.NewInstantQuery(ctx, nil, nil, "up", time.Now()) + require.NoError(t, err) + + // Close the tracked query. + query.Close() + + // Verify the inner query's Close was called. + assert.True(t, mq.closed, "Close on tracked query should delegate to inner query's Close") +} + +func TestEngineWrapper_AllMethodsWrapQuery(t *testing.T) { + registry := NewQueryRegistry(testMetricFunc) + mq := &mockQuery{execResult: &promql.Result{}} + inner := &mockEngine{query: mq} + wrapper := NewResourceEvictingEngine(inner, registry) + + ctx := ctxWithStats(context.Background()) + now := time.Now() + + // All four methods should return a trackedQuery (not the raw mockQuery). + q1, err := wrapper.NewInstantQuery(ctx, nil, nil, "up", now) + require.NoError(t, err) + _, ok := q1.(*trackedQuery) + assert.True(t, ok, "NewInstantQuery should return a trackedQuery") + + q2, err := wrapper.NewRangeQuery(ctx, nil, nil, "up", now, now, time.Minute) + require.NoError(t, err) + _, ok = q2.(*trackedQuery) + assert.True(t, ok, "NewRangeQuery should return a trackedQuery") + + q3, err := wrapper.MakeInstantQueryFromPlan(ctx, nil, nil, nil, now, "up") + require.NoError(t, err) + _, ok = q3.(*trackedQuery) + assert.True(t, ok, "MakeInstantQueryFromPlan should return a trackedQuery") + + q4, err := wrapper.MakeRangeQueryFromPlan(ctx, nil, nil, nil, now, now, time.Minute, "up") + require.NoError(t, err) + _, ok = q4.(*trackedQuery) + assert.True(t, ok, "MakeRangeQueryFromPlan should return a trackedQuery") +} diff --git a/pkg/util/queryeviction/evictor.go b/pkg/util/queryeviction/evictor.go new file mode 100644 index 00000000000..08b835f482e --- /dev/null +++ b/pkg/util/queryeviction/evictor.go @@ -0,0 +1,150 @@ +package queryeviction + +import ( + "context" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/cortexproject/cortex/pkg/util/resource" + "github.com/cortexproject/cortex/pkg/util/services" +) + +// EvictionConfig configures the resource-based query evictor. +type EvictionConfig struct { + CPUUtilization float64 `yaml:"cpu_utilization"` + HeapUtilization float64 `yaml:"heap_utilization"` + CheckInterval time.Duration `yaml:"check_interval"` + CooldownPeriod int `yaml:"cooldown_period"` + EvictionMetric string `yaml:"eviction_metric"` + MinQueryAge time.Duration `yaml:"min_query_age"` +} + +// Enabled returns true if at least one threshold is > 0. +func (c EvictionConfig) Enabled() bool { + return c.CPUUtilization > 0 || c.HeapUtilization > 0 +} + +// QueryEvictor monitors system-wide resource utilization and evicts +// the heaviest running query when thresholds are breached. +type QueryEvictor struct { + services.Service + + monitor resource.IMonitor + registry *QueryRegistry + cfg EvictionConfig + logger log.Logger + + // Prometheus metrics + evictionsTotal *prometheus.CounterVec // labels: resource, component +} + +// NewQueryEvictor creates a new evictor. Returns nil if config is disabled. +func NewQueryEvictor( + monitor resource.IMonitor, + registry *QueryRegistry, + cfg EvictionConfig, + logger log.Logger, + reg prometheus.Registerer, + component string, +) (*QueryEvictor, error) { + if !cfg.Enabled() { + return nil, nil + } + + e := &QueryEvictor{ + monitor: monitor, + registry: registry, + cfg: cfg, + logger: logger, + evictionsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_evictions_total", + Help: "Total number of queries evicted due to resource pressure.", + ConstLabels: map[string]string{"component": component}, + }, []string{"resource"}), + } + + e.Service = services.NewBasicService(nil, e.running, nil) + return e, nil +} + +// running is the main loop (called by services.Service). +func (e *QueryEvictor) running(ctx context.Context) error { + ticker := time.NewTicker(e.cfg.CheckInterval) + defer ticker.Stop() + + cooldownRemaining := 0 + + for { + select { + case <-ctx.Done(): + return nil + + case <-ticker.C: + // If in cooldown, decrement and skip this tick. + if cooldownRemaining > 0 { + cooldownRemaining-- + continue + } + + // Check system-wide resource utilization. + breachedResource, utilization, threshold := e.checkThresholds() + if breachedResource == "" { + continue // no breach + } + + // Find the heaviest running query. + heaviest := e.registry.FindHeaviest(e.cfg.MinQueryAge) + if heaviest == nil { + continue // no running queries to evict + } + + // Evict the heaviest query. + metricValue := e.registry.metric(heaviest.Stats) + heaviest.Cancel() + + // Log the eviction. + level.Warn(e.logger).Log( + "msg", "evicting heaviest query due to resource pressure", + "resource", breachedResource, + "utilization", utilization, + "threshold", threshold, + "request_id", heaviest.RequestID, + "query", heaviest.QueryExpr, + "user", heaviest.UserID, + "metric", e.cfg.EvictionMetric, + "metric_value", metricValue, + ) + + // Increment metrics. + e.evictionsTotal.WithLabelValues(string(breachedResource)).Inc() + + // Enter cooldown. + cooldownRemaining = e.cfg.CooldownPeriod + } + } +} + +// checkThresholds returns the first breached resource type, its current +// utilization, and the configured threshold. Returns ("", 0, 0) if no breach. +// CPU is checked before heap (deterministic priority). +func (e *QueryEvictor) checkThresholds() (resource.Type, float64, float64) { + if e.cfg.CPUUtilization > 0 { + cpuUtil := e.monitor.GetCPUUtilization() + if cpuUtil >= e.cfg.CPUUtilization { + return resource.CPU, cpuUtil, e.cfg.CPUUtilization + } + } + + if e.cfg.HeapUtilization > 0 { + heapUtil := e.monitor.GetHeapUtilization() + if heapUtil >= e.cfg.HeapUtilization { + return resource.Heap, heapUtil, e.cfg.HeapUtilization + } + } + + return "", 0, 0 +} diff --git a/pkg/util/queryeviction/evictor_test.go b/pkg/util/queryeviction/evictor_test.go new file mode 100644 index 00000000000..cf764604909 --- /dev/null +++ b/pkg/util/queryeviction/evictor_test.go @@ -0,0 +1,233 @@ +package queryeviction + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/util/resource" + "github.com/cortexproject/cortex/pkg/util/services" +) + +type mockMonitor struct { + cpuUtil atomic.Value + heapUtil atomic.Value +} + +func newMockMonitor(cpu, heap float64) *mockMonitor { + m := &mockMonitor{} + m.cpuUtil.Store(cpu) + m.heapUtil.Store(heap) + return m +} + +func (m *mockMonitor) GetCPUUtilization() float64 { return m.cpuUtil.Load().(float64) } +func (m *mockMonitor) GetHeapUtilization() float64 { return m.heapUtil.Load().(float64) } + +func testEvictorConfig(cpu, heap float64, cooldown int) EvictionConfig { + return EvictionConfig{ + CPUUtilization: cpu, + HeapUtilization: heap, + CheckInterval: 10 * time.Millisecond, + CooldownPeriod: cooldown, + EvictionMetric: "fetched_samples", + } +} + +// startEvictor creates and starts an evictor, returning it and a cleanup function. +func startEvictor(t *testing.T, mon *mockMonitor, reg *QueryRegistry, cfg EvictionConfig) *QueryEvictor { + t.Helper() + evictor, err := NewQueryEvictor(mon, reg, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), "test") + require.NoError(t, err) + require.NotNil(t, evictor) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), evictor)) + t.Cleanup(func() { services.StopAndAwaitTerminated(context.Background(), evictor) }) //nolint:errcheck + return evictor +} + +// registerTestQuery registers a query and returns a channel closed on eviction. +func registerTestQuery(reg *QueryRegistry, fetchedSamples uint64, expr, user string) (uint64, chan struct{}) { + ctx, cancel := context.WithCancel(context.Background()) + stats := &querier_stats.QueryStats{} + stats.AddFetchedSamples(fetchedSamples) + + evicted := make(chan struct{}) + id := reg.Register(func() { cancel(); close(evicted) }, stats, expr, user, "") + _ = ctx + return id, evicted +} + +func waitEvicted(t *testing.T, ch chan struct{}) { + t.Helper() + select { + case <-ch: + case <-time.After(500 * time.Millisecond): + t.Fatal("expected query to be evicted") + } +} + +func assertNotEvicted(t *testing.T, ch chan struct{}, wait time.Duration) { + t.Helper() + select { + case <-ch: + t.Fatal("query should not have been evicted") + case <-time.After(wait): + } +} + +func TestEviction_OccursWhenAboveThreshold(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evicted := registerTestQuery(reg, 1000, "up", "user1") + startEvictor(t, newMockMonitor(0.95, 0.0), reg, testEvictorConfig(0.9, 0, 0)) + waitEvicted(t, evicted) +} + +func TestNoEviction_WhenBelowThreshold(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evicted := registerTestQuery(reg, 1000, "up", "user1") + startEvictor(t, newMockMonitor(0.5, 0.5), reg, testEvictorConfig(0.9, 0.9, 0)) + assertNotEvicted(t, evicted, 100*time.Millisecond) + assert.Equal(t, 1, reg.Len()) +} + +func TestCooldown_BlocksEvictionThenResumes(t *testing.T) { + mon := newMockMonitor(0.95, 0.0) + reg := NewQueryRegistry(testMetricFunc) + _, evicted1 := registerTestQuery(reg, 1000, "q1", "user1") + + // cooldown=3 ticks × 10ms = 30ms + startEvictor(t, mon, reg, testEvictorConfig(0.9, 0, 3)) + waitEvicted(t, evicted1) + + // Second query registered during cooldown should not be evicted immediately. + _, evicted2 := registerTestQuery(reg, 2000, "q2", "user2") + assertNotEvicted(t, evicted2, 20*time.Millisecond) + + // But should be evicted after cooldown expires. + waitEvicted(t, evicted2) +} + +func TestCPUOnlyThreshold(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evicted := registerTestQuery(reg, 1000, "up", "user1") + startEvictor(t, newMockMonitor(0.95, 0.95), reg, testEvictorConfig(0.9, 0, 0)) + waitEvicted(t, evicted) +} + +func TestHeapOnlyThreshold(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evicted := registerTestQuery(reg, 1000, "up", "user1") + startEvictor(t, newMockMonitor(0.0, 0.95), reg, testEvictorConfig(0, 0.9, 0)) + waitEvicted(t, evicted) +} + +func TestCPUCheckedBeforeHeap(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evicted := registerTestQuery(reg, 1000, "up", "user1") + evictor := startEvictor(t, newMockMonitor(0.95, 0.95), reg, testEvictorConfig(0.9, 0.9, 0)) + waitEvicted(t, evicted) + + assert.Equal(t, float64(1), promtest.ToFloat64(evictor.evictionsTotal.WithLabelValues(string(resource.CPU)))) + assert.Equal(t, float64(0), promtest.ToFloat64(evictor.evictionsTotal.WithLabelValues(string(resource.Heap)))) +} + +func TestEmptyRegistry_NoPanic(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + evictor := startEvictor(t, newMockMonitor(0.95, 0.95), reg, testEvictorConfig(0.9, 0.9, 0)) + + time.Sleep(50 * time.Millisecond) + + assert.Equal(t, float64(0), promtest.ToFloat64(evictor.evictionsTotal.WithLabelValues(string(resource.CPU)))) + assert.Equal(t, float64(0), promtest.ToFloat64(evictor.evictionsTotal.WithLabelValues(string(resource.Heap)))) +} + +func TestCheckThresholds(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + + tests := map[string]struct { + cpuUtil, heapUtil float64 + cpuThresh, heapThresh float64 + wantResource resource.Type + wantUtil, wantThreshold float64 + }{ + "CPU breached": {0.95, 0.5, 0.9, 0.9, resource.CPU, 0.95, 0.9}, + "Heap breached": {0.5, 0.95, 0.9, 0.9, resource.Heap, 0.95, 0.9}, + "Both breached, CPU first": {0.92, 0.93, 0.9, 0.9, resource.CPU, 0.92, 0.9}, + "Neither breached": {0.5, 0.5, 0.9, 0.9, "", 0, 0}, + "CPU disabled, heap breached": {0.95, 0.95, 0, 0.9, resource.Heap, 0.95, 0.9}, + "Heap disabled, CPU breached": {0.95, 0.95, 0.9, 0, resource.CPU, 0.95, 0.9}, + "Exact threshold triggers": {0.9, 0.5, 0.9, 0.9, resource.CPU, 0.9, 0.9}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + mon := newMockMonitor(tc.cpuUtil, tc.heapUtil) + cfg := EvictionConfig{ + CPUUtilization: tc.cpuThresh, + HeapUtilization: tc.heapThresh, + CheckInterval: time.Second, + EvictionMetric: "fetched_samples", + } + + var evictor *QueryEvictor + if cfg.Enabled() { + var err error + evictor, err = NewQueryEvictor(mon, reg, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), "test") + require.NoError(t, err) + } else { + evictor = &QueryEvictor{monitor: mon, cfg: cfg} + } + + resType, util, thresh := evictor.checkThresholds() + assert.Equal(t, tc.wantResource, resType) + assert.Equal(t, tc.wantUtil, util) + assert.Equal(t, tc.wantThreshold, thresh) + }) + } +} + +func TestPrometheusMetrics_IncrementedCorrectly(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + evictor := startEvictor(t, newMockMonitor(0.95, 0.0), reg, testEvictorConfig(0.9, 0, 0)) + + for i := 0; i < 3; i++ { + _, evicted := registerTestQuery(reg, uint64(1000+i), "q", "user") + waitEvicted(t, evicted) + } + + assert.Equal(t, float64(3), promtest.ToFloat64(evictor.evictionsTotal.WithLabelValues(string(resource.CPU)))) +} + +func TestNewQueryEvictor_ReturnsNilWhenDisabled(t *testing.T) { + cfg := EvictionConfig{CheckInterval: time.Second, EvictionMetric: "fetched_samples"} + evictor, err := NewQueryEvictor(newMockMonitor(0, 0), NewQueryRegistry(testMetricFunc), cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), "test") + assert.NoError(t, err) + assert.Nil(t, evictor) +} + +func TestEviction_HeaviestQueryIsEvicted(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evictedSmall := registerTestQuery(reg, 100, "small", "user1") + _, evictedLarge := registerTestQuery(reg, 10000, "large", "user2") + _, evictedMedium := registerTestQuery(reg, 500, "medium", "user3") + + startEvictor(t, newMockMonitor(0.95, 0.0), reg, testEvictorConfig(0.9, 0, 0)) + + select { + case <-evictedLarge: + case <-evictedSmall: + t.Fatal("small query evicted before heaviest") + case <-evictedMedium: + t.Fatal("medium query evicted before heaviest") + case <-time.After(500 * time.Millisecond): + t.Fatal("expected heaviest query to be evicted") + } +} diff --git a/pkg/util/queryeviction/registry.go b/pkg/util/queryeviction/registry.go new file mode 100644 index 00000000000..c5099579b36 --- /dev/null +++ b/pkg/util/queryeviction/registry.go @@ -0,0 +1,138 @@ +package queryeviction + +import ( + "context" + "fmt" + "sync" + "time" + + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/gogo/status" + "google.golang.org/grpc/codes" +) + +// ErrQueryEvicted is returned when a query is cancelled by the evictor. +type ErrQueryEvicted struct{} + +func (e *ErrQueryEvicted) Error() string { + return status.Error(codes.ResourceExhausted, "resource limit reached").Error() +} + +// QueryEntry represents a single running query in the registry. +type QueryEntry struct { + QueryID uint64 + Cancel context.CancelFunc + Stats *querier_stats.QueryStats + QueryExpr string // PromQL expression for logging + UserID string // tenant ID for logging/metrics + RequestID string // request ID for correlation + RegisteredAt time.Time +} + +// MetricFunc extracts a comparable weight value from QueryStats. +// Higher values mean "heavier" query. +type MetricFunc func(s *querier_stats.QueryStats) uint64 + +// QueryRegistry tracks all currently running queries. +type QueryRegistry struct { + mu sync.RWMutex + queries map[uint64]*QueryEntry + nextID uint64 + metric MetricFunc // configurable: default is LoadPeakSamples +} + +// NewQueryRegistry creates a registry with the given metric function. +func NewQueryRegistry(metric MetricFunc) *QueryRegistry { + return &QueryRegistry{ + queries: make(map[uint64]*QueryEntry), + metric: metric, + } +} + +// Register adds a running query and returns its unique, monotonically increasing ID. +func (r *QueryRegistry) Register(cancel context.CancelFunc, stats *querier_stats.QueryStats, queryExpr string, userID string, requestID string) uint64 { + r.mu.Lock() + defer r.mu.Unlock() + + r.nextID++ + id := r.nextID + + r.queries[id] = &QueryEntry{ + QueryID: id, + Cancel: cancel, + Stats: stats, + QueryExpr: queryExpr, + UserID: userID, + RequestID: requestID, + RegisteredAt: time.Now(), + } + + return id +} + +// Deregister removes a query from the registry. +// It is a no-op if the ID is not found. +func (r *QueryRegistry) Deregister(id uint64) { + r.mu.Lock() + defer r.mu.Unlock() + + delete(r.queries, id) +} + +// FindHeaviest returns the entry with the highest metric value +// among queries that have been running for at least minAge, +// or nil if no eligible queries exist. +func (r *QueryRegistry) FindHeaviest(minAge time.Duration) *QueryEntry { + r.mu.RLock() + defer r.mu.RUnlock() + + var heaviest *QueryEntry + var maxWeight uint64 + now := time.Now() + + for _, entry := range r.queries { + if now.Sub(entry.RegisteredAt) < minAge { + continue + } + weight := r.metric(entry.Stats) + if heaviest == nil || weight > maxWeight { + heaviest = entry + maxWeight = weight + } + } + + return heaviest +} + +// Len returns the number of currently registered queries. +func (r *QueryRegistry) Len() int { + r.mu.RLock() + defer r.mu.RUnlock() + + return len(r.queries) +} + +// ResolveMetricFunc returns the MetricFunc for the given metric name. +// An empty string defaults to "fetched_samples". +func ResolveMetricFunc(metricName string) (MetricFunc, error) { + switch metricName { + case "fetched_samples", "": + return func(s *querier_stats.QueryStats) uint64 { + return s.LoadFetchedSamples() + }, nil + case "fetched_series": + return func(s *querier_stats.QueryStats) uint64 { + return s.LoadFetchedSeries() + }, nil + case "fetched_chunks": + return func(s *querier_stats.QueryStats) uint64 { + return s.LoadFetchedChunks() + }, nil + case "fetched_chunk_bytes": + return func(s *querier_stats.QueryStats) uint64 { + return s.LoadFetchedChunkBytes() + }, nil + default: + return nil, fmt.Errorf("unsupported eviction metric: %s", metricName) + } +} diff --git a/pkg/util/queryeviction/registry_test.go b/pkg/util/queryeviction/registry_test.go new file mode 100644 index 00000000000..57ea2347d2e --- /dev/null +++ b/pkg/util/queryeviction/registry_test.go @@ -0,0 +1,198 @@ +package queryeviction + +import ( + "context" + "sync" + "testing" + + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// newTestStats creates a QueryStats with the given fetched samples value. +func newTestStats(fetchedSamples uint64) *querier_stats.QueryStats { + stats := &querier_stats.QueryStats{} + stats.AddFetchedSamples(fetchedSamples) + return stats +} + +// testMetricFunc is a MetricFunc that returns FetchedSamples for testing. +func testMetricFunc(s *querier_stats.QueryStats) uint64 { + return s.LoadFetchedSamples() +} + +func TestRegister_UniqueMonotonicallyIncreasingIDs(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + ids := make([]uint64, 10) + for i := 0; i < 10; i++ { + ids[i] = reg.Register(cancel, newTestStats(0), "query", "user", "") + } + + for i := 1; i < len(ids); i++ { + assert.Greater(t, ids[i], ids[i-1], "IDs should be monotonically increasing") + } + + // Verify all IDs are unique. + seen := make(map[uint64]bool) + for _, id := range ids { + assert.False(t, seen[id], "ID %d should be unique", id) + seen[id] = true + } +} + +func TestDeregister_RemovesEntry(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + id := reg.Register(cancel, newTestStats(100), "query", "user", "") + require.Equal(t, 1, reg.Len()) + + // FindHeaviest should return the registered entry. + heaviest := reg.FindHeaviest(0) + require.NotNil(t, heaviest) + assert.Equal(t, id, heaviest.QueryID) + + // Deregister and verify it's gone. + reg.Deregister(id) + assert.Equal(t, 0, reg.Len()) + assert.Nil(t, reg.FindHeaviest(0), "FindHeaviest should return nil after deregistering the only entry") +} + +func TestDeregister_UnknownID_IsNoOp(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + id := reg.Register(cancel, newTestStats(100), "query", "user", "") + + // Deregister an ID that was never registered. + reg.Deregister(99999) + + // Original entry should still be present. + assert.Equal(t, 1, reg.Len()) + heaviest := reg.FindHeaviest(0) + require.NotNil(t, heaviest) + assert.Equal(t, id, heaviest.QueryID) +} + +func TestFindHeaviest_ReturnsHighestMetricValue(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + reg.Register(cancel, newTestStats(100), "small-query", "user1", "") + reg.Register(cancel, newTestStats(500), "medium-query", "user2", "") + heaviestID := reg.Register(cancel, newTestStats(1000), "large-query", "user3", "") + reg.Register(cancel, newTestStats(200), "another-query", "user4", "") + + heaviest := reg.FindHeaviest(0) + require.NotNil(t, heaviest) + assert.Equal(t, heaviestID, heaviest.QueryID) + assert.Equal(t, "large-query", heaviest.QueryExpr) + assert.Equal(t, uint64(1000), heaviest.Stats.LoadFetchedSamples()) +} + +func TestFindHeaviest_EmptyRegistry(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + assert.Nil(t, reg.FindHeaviest(0), "FindHeaviest should return nil for empty registry") +} + +func TestLen_ReflectsCurrentCount(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + assert.Equal(t, 0, reg.Len()) + + id1 := reg.Register(cancel, newTestStats(10), "q1", "u1", "") + assert.Equal(t, 1, reg.Len()) + + id2 := reg.Register(cancel, newTestStats(20), "q2", "u2", "") + assert.Equal(t, 2, reg.Len()) + + id3 := reg.Register(cancel, newTestStats(30), "q3", "u3", "") + assert.Equal(t, 3, reg.Len()) + + reg.Deregister(id2) + assert.Equal(t, 2, reg.Len()) + + reg.Deregister(id1) + assert.Equal(t, 1, reg.Len()) + + reg.Deregister(id3) + assert.Equal(t, 0, reg.Len()) +} + +func TestConcurrent_RegisterDeregisterFindHeaviest(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + + const goroutines = 20 + const opsPerGoroutine = 100 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for g := 0; g < goroutines; g++ { + go func() { + defer wg.Done() + for i := 0; i < opsPerGoroutine; i++ { + _, cancel := context.WithCancel(context.Background()) + stats := newTestStats(uint64(i)) + id := reg.Register(cancel, stats, "concurrent-query", "user", "") + + // Interleave FindHeaviest and Len calls. + _ = reg.FindHeaviest(0) + _ = reg.Len() + + reg.Deregister(id) + cancel() + } + }() + } + + wg.Wait() + + // After all goroutines complete, registry should be empty. + assert.Equal(t, 0, reg.Len()) +} + +func TestResolveMetricFunc_AllSupportedMetrics(t *testing.T) { + stats := &querier_stats.QueryStats{} + stats.AddFetchedSamples(20) + stats.AddFetchedSeries(30) + stats.AddFetchedChunks(40) + stats.AddFetchedChunkBytes(50) + + tests := []struct { + name string + metricName string + expectedValue uint64 + }{ + {name: "fetched_samples", metricName: "fetched_samples", expectedValue: 20}, + {name: "empty string defaults to fetched_samples", metricName: "", expectedValue: 20}, + {name: "fetched_series", metricName: "fetched_series", expectedValue: 30}, + {name: "fetched_chunks", metricName: "fetched_chunks", expectedValue: 40}, + {name: "fetched_chunk_bytes", metricName: "fetched_chunk_bytes", expectedValue: 50}, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + fn, err := ResolveMetricFunc(tc.metricName) + require.NoError(t, err) + require.NotNil(t, fn) + assert.Equal(t, tc.expectedValue, fn(stats)) + }) + } +} + +func TestResolveMetricFunc_UnsupportedMetric(t *testing.T) { + fn, err := ResolveMetricFunc("unknown_metric") + assert.Error(t, err) + assert.Nil(t, fn) + assert.Contains(t, err.Error(), "unsupported eviction metric") +} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index ed2549651f7..63d7339902e 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -4881,6 +4881,54 @@ }, "query_protection": { "properties": { + "eviction": { + "properties": { + "check_interval": { + "default": "1s", + "description": "EXPERIMENTAL: How frequently the evictor checks system resource utilization.", + "type": "string", + "x-cli-flag": "ingester.query-protection.eviction.check-interval", + "x-format": "duration" + }, + "cooldown_period": { + "default": 3, + "description": "EXPERIMENTAL: Number of check intervals to wait after an eviction before evicting again.", + "type": "number", + "x-cli-flag": "ingester.query-protection.eviction.cooldown-period" + }, + "eviction_metric": { + "default": "fetched_samples", + "description": "EXPERIMENTAL: The query metric used to determine the heaviest query for eviction. Supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes.", + "type": "string", + "x-cli-flag": "ingester.query-protection.eviction.eviction-metric" + }, + "min_query_age": { + "default": "10s", + "description": "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.", + "type": "string", + "x-cli-flag": "ingester.query-protection.eviction.min-query-age", + "x-format": "duration" + }, + "threshold": { + "properties": { + "cpu_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max CPU utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "ingester.query-protection.eviction.threshold.cpu-utilization" + }, + "heap_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max heap utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "ingester.query-protection.eviction.threshold.heap-utilization" + } + }, + "type": "object" + } + }, + "type": "object" + }, "rejection": { "properties": { "threshold": { @@ -6044,6 +6092,54 @@ }, "query_protection": { "properties": { + "eviction": { + "properties": { + "check_interval": { + "default": "1s", + "description": "EXPERIMENTAL: How frequently the evictor checks system resource utilization.", + "type": "string", + "x-cli-flag": "querier.query-protection.eviction.check-interval", + "x-format": "duration" + }, + "cooldown_period": { + "default": 3, + "description": "EXPERIMENTAL: Number of check intervals to wait after an eviction before evicting again.", + "type": "number", + "x-cli-flag": "querier.query-protection.eviction.cooldown-period" + }, + "eviction_metric": { + "default": "fetched_samples", + "description": "EXPERIMENTAL: The query metric used to determine the heaviest query for eviction. Supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes.", + "type": "string", + "x-cli-flag": "querier.query-protection.eviction.eviction-metric" + }, + "min_query_age": { + "default": "10s", + "description": "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.", + "type": "string", + "x-cli-flag": "querier.query-protection.eviction.min-query-age", + "x-format": "duration" + }, + "threshold": { + "properties": { + "cpu_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max CPU utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "querier.query-protection.eviction.threshold.cpu-utilization" + }, + "heap_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max heap utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "querier.query-protection.eviction.threshold.heap-utilization" + } + }, + "type": "object" + } + }, + "type": "object" + }, "rejection": { "properties": { "threshold": { @@ -8410,6 +8506,54 @@ }, "query_protection": { "properties": { + "eviction": { + "properties": { + "check_interval": { + "default": "1s", + "description": "EXPERIMENTAL: How frequently the evictor checks system resource utilization.", + "type": "string", + "x-cli-flag": "store-gateway.query-protection.eviction.check-interval", + "x-format": "duration" + }, + "cooldown_period": { + "default": 3, + "description": "EXPERIMENTAL: Number of check intervals to wait after an eviction before evicting again.", + "type": "number", + "x-cli-flag": "store-gateway.query-protection.eviction.cooldown-period" + }, + "eviction_metric": { + "default": "fetched_samples", + "description": "EXPERIMENTAL: The query metric used to determine the heaviest query for eviction. Supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes.", + "type": "string", + "x-cli-flag": "store-gateway.query-protection.eviction.eviction-metric" + }, + "min_query_age": { + "default": "10s", + "description": "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.", + "type": "string", + "x-cli-flag": "store-gateway.query-protection.eviction.min-query-age", + "x-format": "duration" + }, + "threshold": { + "properties": { + "cpu_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max CPU utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "store-gateway.query-protection.eviction.threshold.cpu-utilization" + }, + "heap_utilization": { + "default": 0, + "description": "EXPERIMENTAL: Max heap utilization that this instance can reach before evicting the heaviest running query (across all tenants) in percentage, between 0 and 1. monitored_resources config must include the resource type. 0 to disable.", + "type": "number", + "x-cli-flag": "store-gateway.query-protection.eviction.threshold.heap-utilization" + } + }, + "type": "object" + } + }, + "type": "object" + }, "rejection": { "properties": { "threshold": { From f64ba8d94f6073d60677fd64985ee1b64cb8b250 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 7 May 2026 15:09:11 -0700 Subject: [PATCH 02/11] update changelog Signed-off-by: Essam Eldaly --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7815bb58495..16430496052 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374 * [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442 +* [FEATURE] Querier: Add resource-based query eviction that automatically cancels the heaviest running query when CPU or heap utilization exceeds configured thresholds. #7488 * [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 * [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401 From 4938f3fc95d83eaefd7747ea7fd96ae9117793ff Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 7 May 2026 15:12:01 -0700 Subject: [PATCH 03/11] update guarantees Signed-off-by: Essam Eldaly --- docs/configuration/v1-guarantees.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index be3ee78ce59..105ab8ece2e 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -129,3 +129,10 @@ Currently experimental features are: - `-validation.max-label-cardinality-for-unoptimized-regex` (int) - maximum label cardinality - `-validation.max-total-label-value-length-for-unoptimized-regex` (int) - maximum total length of all label values in bytes - HATracker: `-distributor.ha-tracker.enable-startup-sync` (bool) - If enabled, fetches all tracked keys on startup to populate the local cache. +- Querier: Resource-based query eviction + - `-querier.query-protection.eviction.threshold.cpu-utilization` (float) + - `-querier.query-protection.eviction.threshold.heap-utilization` (float) + - `-querier.query-protection.eviction.check-interval` (duration) + - `-querier.query-protection.eviction.cooldown-period` (int) + - `-querier.query-protection.eviction.eviction-metric` (string) + - `-querier.query-protection.eviction.min-query-age` (duration) From cb9de7d33ea7b6ae48a3eb773fef9444f9da6718 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 7 May 2026 15:16:30 -0700 Subject: [PATCH 04/11] lint Signed-off-by: Essam Eldaly --- pkg/util/queryeviction/registry.go | 3 ++- pkg/util/queryeviction/registry_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/util/queryeviction/registry.go b/pkg/util/queryeviction/registry.go index c5099579b36..89dd6cc0b62 100644 --- a/pkg/util/queryeviction/registry.go +++ b/pkg/util/queryeviction/registry.go @@ -6,9 +6,10 @@ import ( "sync" "time" - querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/gogo/status" "google.golang.org/grpc/codes" + + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" ) // ErrQueryEvicted is returned when a query is cancelled by the evictor. diff --git a/pkg/util/queryeviction/registry_test.go b/pkg/util/queryeviction/registry_test.go index 57ea2347d2e..1db5ff72078 100644 --- a/pkg/util/queryeviction/registry_test.go +++ b/pkg/util/queryeviction/registry_test.go @@ -5,9 +5,10 @@ import ( "sync" "testing" - querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" ) // newTestStats creates a QueryStats with the given fetched samples value. From 0a01fbc82635a5fe8eb8fa66a40d9e8a8d70fd83 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 7 May 2026 15:25:20 -0700 Subject: [PATCH 05/11] lint atomic Signed-off-by: Essam Eldaly --- pkg/util/queryeviction/evictor_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/util/queryeviction/evictor_test.go b/pkg/util/queryeviction/evictor_test.go index cf764604909..d73200b0707 100644 --- a/pkg/util/queryeviction/evictor_test.go +++ b/pkg/util/queryeviction/evictor_test.go @@ -2,7 +2,6 @@ package queryeviction import ( "context" - "sync/atomic" "testing" "time" @@ -11,6 +10,7 @@ import ( promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util/resource" @@ -18,8 +18,8 @@ import ( ) type mockMonitor struct { - cpuUtil atomic.Value - heapUtil atomic.Value + cpuUtil atomic.Float64 + heapUtil atomic.Float64 } func newMockMonitor(cpu, heap float64) *mockMonitor { @@ -29,8 +29,8 @@ func newMockMonitor(cpu, heap float64) *mockMonitor { return m } -func (m *mockMonitor) GetCPUUtilization() float64 { return m.cpuUtil.Load().(float64) } -func (m *mockMonitor) GetHeapUtilization() float64 { return m.heapUtil.Load().(float64) } +func (m *mockMonitor) GetCPUUtilization() float64 { return m.cpuUtil.Load() } +func (m *mockMonitor) GetHeapUtilization() float64 { return m.heapUtil.Load() } func testEvictorConfig(cpu, heap float64, cooldown int) EvictionConfig { return EvictionConfig{ From 1de552c06273621a084c31825fe67ed7d0f79a1c Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Thu, 7 May 2026 16:06:55 -0700 Subject: [PATCH 06/11] lint modernize Signed-off-by: Essam Eldaly --- pkg/util/queryeviction/evictor_test.go | 2 +- pkg/util/queryeviction/registry_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/util/queryeviction/evictor_test.go b/pkg/util/queryeviction/evictor_test.go index d73200b0707..4a104ee06a1 100644 --- a/pkg/util/queryeviction/evictor_test.go +++ b/pkg/util/queryeviction/evictor_test.go @@ -198,7 +198,7 @@ func TestPrometheusMetrics_IncrementedCorrectly(t *testing.T) { reg := NewQueryRegistry(testMetricFunc) evictor := startEvictor(t, newMockMonitor(0.95, 0.0), reg, testEvictorConfig(0.9, 0, 0)) - for i := 0; i < 3; i++ { + for i := range 3 { _, evicted := registerTestQuery(reg, uint64(1000+i), "q", "user") waitEvicted(t, evicted) } diff --git a/pkg/util/queryeviction/registry_test.go b/pkg/util/queryeviction/registry_test.go index 1db5ff72078..c52181809c4 100644 --- a/pkg/util/queryeviction/registry_test.go +++ b/pkg/util/queryeviction/registry_test.go @@ -29,7 +29,7 @@ func TestRegister_UniqueMonotonicallyIncreasingIDs(t *testing.T) { defer cancel() ids := make([]uint64, 10) - for i := 0; i < 10; i++ { + for i := range 10 { ids[i] = reg.Register(cancel, newTestStats(0), "query", "user", "") } @@ -138,10 +138,10 @@ func TestConcurrent_RegisterDeregisterFindHeaviest(t *testing.T) { var wg sync.WaitGroup wg.Add(goroutines) - for g := 0; g < goroutines; g++ { + for range goroutines { go func() { defer wg.Done() - for i := 0; i < opsPerGoroutine; i++ { + for i := range opsPerGoroutine { _, cancel := context.WithCancel(context.Background()) stats := newTestStats(uint64(i)) id := reg.Register(cancel, stats, "concurrent-query", "user", "") From 9bb026995a7002ab058c87a68c6fc0450371720e Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 11 May 2026 08:42:02 -0700 Subject: [PATCH 07/11] use configs.evictionConfig instead of copy Signed-off-by: Essam Eldaly --- pkg/querier/querier.go | 13 ++-------- pkg/util/queryeviction/evictor.go | 32 +++++++------------------ pkg/util/queryeviction/evictor_test.go | 33 +++++++++++++++----------- 3 files changed, 30 insertions(+), 48 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 064a8ca309f..2bbb143d632 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -263,22 +263,13 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor evictionCfg := cfg.QueryProtection.Eviction if evictionCfg.Enabled() && resourceMonitor != nil { - evCfg := queryeviction.EvictionConfig{ - CPUUtilization: evictionCfg.Threshold.CPUUtilization, - HeapUtilization: evictionCfg.Threshold.HeapUtilization, - CheckInterval: evictionCfg.CheckInterval, - CooldownPeriod: evictionCfg.CooldownPeriod, - EvictionMetric: evictionCfg.EvictionMetric, - MinQueryAge: evictionCfg.MinQueryAge, - } - - metricFunc, err := queryeviction.ResolveMetricFunc(evCfg.EvictionMetric) + metricFunc, err := queryeviction.ResolveMetricFunc(evictionCfg.EvictionMetric) if err != nil { level.Error(logger).Log("msg", "invalid eviction metric", "err", err) } else { queryRegistry = queryeviction.NewQueryRegistry(metricFunc) queryEvictor, err = queryeviction.NewQueryEvictor( - resourceMonitor, queryRegistry, evCfg, + resourceMonitor, queryRegistry, evictionCfg, logger, reg, "querier", ) if err != nil { diff --git a/pkg/util/queryeviction/evictor.go b/pkg/util/queryeviction/evictor.go index 08b835f482e..e7305d01c64 100644 --- a/pkg/util/queryeviction/evictor.go +++ b/pkg/util/queryeviction/evictor.go @@ -9,25 +9,11 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/cortexproject/cortex/pkg/configs" "github.com/cortexproject/cortex/pkg/util/resource" "github.com/cortexproject/cortex/pkg/util/services" ) -// EvictionConfig configures the resource-based query evictor. -type EvictionConfig struct { - CPUUtilization float64 `yaml:"cpu_utilization"` - HeapUtilization float64 `yaml:"heap_utilization"` - CheckInterval time.Duration `yaml:"check_interval"` - CooldownPeriod int `yaml:"cooldown_period"` - EvictionMetric string `yaml:"eviction_metric"` - MinQueryAge time.Duration `yaml:"min_query_age"` -} - -// Enabled returns true if at least one threshold is > 0. -func (c EvictionConfig) Enabled() bool { - return c.CPUUtilization > 0 || c.HeapUtilization > 0 -} - // QueryEvictor monitors system-wide resource utilization and evicts // the heaviest running query when thresholds are breached. type QueryEvictor struct { @@ -35,7 +21,7 @@ type QueryEvictor struct { monitor resource.IMonitor registry *QueryRegistry - cfg EvictionConfig + cfg configs.EvictionConfig logger log.Logger // Prometheus metrics @@ -46,7 +32,7 @@ type QueryEvictor struct { func NewQueryEvictor( monitor resource.IMonitor, registry *QueryRegistry, - cfg EvictionConfig, + cfg configs.EvictionConfig, logger log.Logger, reg prometheus.Registerer, component string, @@ -132,17 +118,17 @@ func (e *QueryEvictor) running(ctx context.Context) error { // utilization, and the configured threshold. Returns ("", 0, 0) if no breach. // CPU is checked before heap (deterministic priority). func (e *QueryEvictor) checkThresholds() (resource.Type, float64, float64) { - if e.cfg.CPUUtilization > 0 { + if e.cfg.Threshold.CPUUtilization > 0 { cpuUtil := e.monitor.GetCPUUtilization() - if cpuUtil >= e.cfg.CPUUtilization { - return resource.CPU, cpuUtil, e.cfg.CPUUtilization + if cpuUtil >= e.cfg.Threshold.CPUUtilization { + return resource.CPU, cpuUtil, e.cfg.Threshold.CPUUtilization } } - if e.cfg.HeapUtilization > 0 { + if e.cfg.Threshold.HeapUtilization > 0 { heapUtil := e.monitor.GetHeapUtilization() - if heapUtil >= e.cfg.HeapUtilization { - return resource.Heap, heapUtil, e.cfg.HeapUtilization + if heapUtil >= e.cfg.Threshold.HeapUtilization { + return resource.Heap, heapUtil, e.cfg.Threshold.HeapUtilization } } diff --git a/pkg/util/queryeviction/evictor_test.go b/pkg/util/queryeviction/evictor_test.go index 4a104ee06a1..55de8d94b5d 100644 --- a/pkg/util/queryeviction/evictor_test.go +++ b/pkg/util/queryeviction/evictor_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/atomic" + "github.com/cortexproject/cortex/pkg/configs" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/util/resource" "github.com/cortexproject/cortex/pkg/util/services" @@ -32,18 +33,20 @@ func newMockMonitor(cpu, heap float64) *mockMonitor { func (m *mockMonitor) GetCPUUtilization() float64 { return m.cpuUtil.Load() } func (m *mockMonitor) GetHeapUtilization() float64 { return m.heapUtil.Load() } -func testEvictorConfig(cpu, heap float64, cooldown int) EvictionConfig { - return EvictionConfig{ - CPUUtilization: cpu, - HeapUtilization: heap, - CheckInterval: 10 * time.Millisecond, - CooldownPeriod: cooldown, - EvictionMetric: "fetched_samples", +func testEvictorConfig(cpu, heap float64, cooldown int) configs.EvictionConfig { + return configs.EvictionConfig{ + Threshold: configs.Threshold{ + CPUUtilization: cpu, + HeapUtilization: heap, + }, + CheckInterval: 10 * time.Millisecond, + CooldownPeriod: cooldown, + EvictionMetric: "fetched_samples", } } // startEvictor creates and starts an evictor, returning it and a cleanup function. -func startEvictor(t *testing.T, mon *mockMonitor, reg *QueryRegistry, cfg EvictionConfig) *QueryEvictor { +func startEvictor(t *testing.T, mon *mockMonitor, reg *QueryRegistry, cfg configs.EvictionConfig) *QueryEvictor { t.Helper() evictor, err := NewQueryEvictor(mon, reg, cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), "test") require.NoError(t, err) @@ -170,11 +173,13 @@ func TestCheckThresholds(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { mon := newMockMonitor(tc.cpuUtil, tc.heapUtil) - cfg := EvictionConfig{ - CPUUtilization: tc.cpuThresh, - HeapUtilization: tc.heapThresh, - CheckInterval: time.Second, - EvictionMetric: "fetched_samples", + cfg := configs.EvictionConfig{ + Threshold: configs.Threshold{ + CPUUtilization: tc.cpuThresh, + HeapUtilization: tc.heapThresh, + }, + CheckInterval: time.Second, + EvictionMetric: "fetched_samples", } var evictor *QueryEvictor @@ -207,7 +212,7 @@ func TestPrometheusMetrics_IncrementedCorrectly(t *testing.T) { } func TestNewQueryEvictor_ReturnsNilWhenDisabled(t *testing.T) { - cfg := EvictionConfig{CheckInterval: time.Second, EvictionMetric: "fetched_samples"} + cfg := configs.EvictionConfig{CheckInterval: time.Second, EvictionMetric: "fetched_samples"} evictor, err := NewQueryEvictor(newMockMonitor(0, 0), NewQueryRegistry(testMetricFunc), cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), "test") assert.NoError(t, err) assert.Nil(t, evictor) From 5ddc12b2f12604f967d2c7234cb85a73607c1bcc Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 11 May 2026 08:49:12 -0700 Subject: [PATCH 08/11] Panic on eivctor creation failures Signed-off-by: Essam Eldaly --- pkg/querier/querier.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 2bbb143d632..f97d060d9d2 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -265,16 +265,16 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor if evictionCfg.Enabled() && resourceMonitor != nil { metricFunc, err := queryeviction.ResolveMetricFunc(evictionCfg.EvictionMetric) if err != nil { - level.Error(logger).Log("msg", "invalid eviction metric", "err", err) - } else { - queryRegistry = queryeviction.NewQueryRegistry(metricFunc) - queryEvictor, err = queryeviction.NewQueryEvictor( - resourceMonitor, queryRegistry, evictionCfg, - logger, reg, "querier", - ) - if err != nil { - level.Error(logger).Log("msg", "failed to create query evictor", "err", err) - } + panic(fmt.Sprintf("invalid eviction metric %q: %v", evictionCfg.EvictionMetric, err)) + } + + queryRegistry = queryeviction.NewQueryRegistry(metricFunc) + queryEvictor, err = queryeviction.NewQueryEvictor( + resourceMonitor, queryRegistry, evictionCfg, + logger, reg, "querier", + ) + if err != nil { + panic(fmt.Sprintf("failed to create query evictor: %v", err)) } } From 63708b6653263d85829e844f4c71911d544ba80a Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 11 May 2026 09:30:26 -0700 Subject: [PATCH 09/11] Add support for evicting multiple queries per cycle Signed-off-by: Essam Eldaly --- pkg/configs/query_protection.go | 16 ++++-- pkg/configs/query_protection_test.go | 33 ++++++------ pkg/querier/querier_eviction_test.go | 7 +-- pkg/util/queryeviction/engine_wrapper_test.go | 6 +-- pkg/util/queryeviction/evictor.go | 44 +++++++-------- pkg/util/queryeviction/evictor_test.go | 40 +++++++++++--- pkg/util/queryeviction/registry.go | 53 +++++++++++++++---- pkg/util/queryeviction/registry_test.go | 28 +++++----- 8 files changed, 149 insertions(+), 78 deletions(-) diff --git a/pkg/configs/query_protection.go b/pkg/configs/query_protection.go index e2b1aa2b1a5..13b7c46bc4d 100644 --- a/pkg/configs/query_protection.go +++ b/pkg/configs/query_protection.go @@ -36,11 +36,12 @@ type Threshold struct { // EvictionConfig configures the resource-based query evictor. type EvictionConfig struct { - Threshold Threshold `yaml:"threshold"` - CheckInterval time.Duration `yaml:"check_interval"` - CooldownPeriod int `yaml:"cooldown_period"` - EvictionMetric string `yaml:"eviction_metric"` - MinQueryAge time.Duration `yaml:"min_query_age"` + Threshold Threshold `yaml:"threshold"` + CheckInterval time.Duration `yaml:"check_interval"` + CooldownPeriod int `yaml:"cooldown_period"` + EvictionMetric string `yaml:"eviction_metric"` + MinQueryAge time.Duration `yaml:"min_query_age"` + MaxEvictionsPerCycle int `yaml:"max_evictions_per_cycle"` } // Enabled returns true when at least one eviction threshold is greater than 0. @@ -60,6 +61,7 @@ func (cfg *QueryProtection) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix stri f.IntVar(&cfg.Eviction.CooldownPeriod, prefix+"query-protection.eviction.cooldown-period", 3, "EXPERIMENTAL: Number of check intervals to wait after an eviction before evicting again.") f.StringVar(&cfg.Eviction.EvictionMetric, prefix+"query-protection.eviction.eviction-metric", "fetched_samples", "EXPERIMENTAL: The query metric used to determine the heaviest query for eviction. Supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes.") f.DurationVar(&cfg.Eviction.MinQueryAge, prefix+"query-protection.eviction.min-query-age", 10*time.Second, "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.") + f.IntVar(&cfg.Eviction.MaxEvictionsPerCycle, prefix+"query-protection.eviction.max-evictions-per-cycle", 1, "EXPERIMENTAL: Maximum number of queries to evict in a single check cycle when resource thresholds are breached.") } func (cfg *QueryProtection) Validate(monitoredResources flagext.StringSliceCSV) error { @@ -104,6 +106,10 @@ func (cfg *QueryProtection) Validate(monitoredResources flagext.StringSliceCSV) return fmt.Errorf("unrecognized eviction_metric %q; supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes", cfg.Eviction.EvictionMetric) } + if cfg.Eviction.MaxEvictionsPerCycle < 1 { + return errors.New("eviction max_evictions_per_cycle must be >= 1") + } + if evThreshold.CPUUtilization > 0 && !strings.Contains(monitoredResources.String(), string(resource.CPU)) { return errors.New("monitored_resources config must include \"cpu\" when eviction cpu threshold is set") } diff --git a/pkg/configs/query_protection_test.go b/pkg/configs/query_protection_test.go index ce92399208c..811781c9f4d 100644 --- a/pkg/configs/query_protection_test.go +++ b/pkg/configs/query_protection_test.go @@ -99,10 +99,11 @@ func Test_EvictionConfig_Validation(t *testing.T) { validBase := func() QueryProtection { return QueryProtection{ Eviction: EvictionConfig{ - Threshold: Threshold{CPUUtilization: 0.8, HeapUtilization: 0.85}, - CheckInterval: 1 * time.Second, - CooldownPeriod: 3, - EvictionMetric: "fetched_samples", + Threshold: Threshold{CPUUtilization: 0.8, HeapUtilization: 0.85}, + CheckInterval: 1 * time.Second, + CooldownPeriod: 3, + EvictionMetric: "fetched_samples", + MaxEvictionsPerCycle: 1, }, } } @@ -112,17 +113,18 @@ func Test_EvictionConfig_Validation(t *testing.T) { monitoredResources []string err string }{ - "valid config passes": {func(qp *QueryProtection) {}, []string{"cpu", "heap"}, ""}, - "cpu > 1 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.CPUUtilization = 1.5 }, []string{"cpu", "heap"}, "eviction cpu_utilization must be between 0 and 1"}, - "cpu < 0 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.CPUUtilization = -0.1 }, []string{"cpu", "heap"}, "eviction cpu_utilization must be between 0 and 1"}, - "heap > 1 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.HeapUtilization = 2.0 }, []string{"cpu", "heap"}, "eviction heap_utilization must be between 0 and 1"}, - "heap < 0 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.HeapUtilization = -0.5 }, []string{"cpu", "heap"}, "eviction heap_utilization must be between 0 and 1"}, - "check_interval 0 fails": {func(qp *QueryProtection) { qp.Eviction.CheckInterval = 0 }, []string{"cpu", "heap"}, "eviction check_interval must be greater than 0 when eviction is enabled"}, - "cooldown < 0 fails": {func(qp *QueryProtection) { qp.Eviction.CooldownPeriod = -1 }, []string{"cpu", "heap"}, "eviction cooldown_period must be >= 0"}, - "unknown metric fails": {func(qp *QueryProtection) { qp.Eviction.EvictionMetric = "unknown" }, []string{"cpu", "heap"}, `unrecognized eviction_metric "unknown"; supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes`}, - "cpu without monitored fails": {func(qp *QueryProtection) {}, []string{"heap"}, `monitored_resources config must include "cpu" when eviction cpu threshold is set`}, - "heap without monitored fails": {func(qp *QueryProtection) {}, []string{"cpu"}, `monitored_resources config must include "heap" when eviction heap threshold is set`}, - "cooldown 0 is valid": {func(qp *QueryProtection) { qp.Eviction.CooldownPeriod = 0 }, []string{"cpu", "heap"}, ""}, + "valid config passes": {func(qp *QueryProtection) {}, []string{"cpu", "heap"}, ""}, + "cpu > 1 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.CPUUtilization = 1.5 }, []string{"cpu", "heap"}, "eviction cpu_utilization must be between 0 and 1"}, + "cpu < 0 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.CPUUtilization = -0.1 }, []string{"cpu", "heap"}, "eviction cpu_utilization must be between 0 and 1"}, + "heap > 1 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.HeapUtilization = 2.0 }, []string{"cpu", "heap"}, "eviction heap_utilization must be between 0 and 1"}, + "heap < 0 fails": {func(qp *QueryProtection) { qp.Eviction.Threshold.HeapUtilization = -0.5 }, []string{"cpu", "heap"}, "eviction heap_utilization must be between 0 and 1"}, + "check_interval 0 fails": {func(qp *QueryProtection) { qp.Eviction.CheckInterval = 0 }, []string{"cpu", "heap"}, "eviction check_interval must be greater than 0 when eviction is enabled"}, + "cooldown < 0 fails": {func(qp *QueryProtection) { qp.Eviction.CooldownPeriod = -1 }, []string{"cpu", "heap"}, "eviction cooldown_period must be >= 0"}, + "unknown metric fails": {func(qp *QueryProtection) { qp.Eviction.EvictionMetric = "unknown" }, []string{"cpu", "heap"}, `unrecognized eviction_metric "unknown"; supported values: fetched_samples, fetched_series, fetched_chunks, fetched_chunk_bytes`}, + "cpu without monitored fails": {func(qp *QueryProtection) {}, []string{"heap"}, `monitored_resources config must include "cpu" when eviction cpu threshold is set`}, + "heap without monitored fails": {func(qp *QueryProtection) {}, []string{"cpu"}, `monitored_resources config must include "heap" when eviction heap threshold is set`}, + "cooldown 0 is valid": {func(qp *QueryProtection) { qp.Eviction.CooldownPeriod = 0 }, []string{"cpu", "heap"}, ""}, + "max_evictions_per_cycle < 1 fails": {func(qp *QueryProtection) { qp.Eviction.MaxEvictionsPerCycle = 0 }, []string{"cpu", "heap"}, "eviction max_evictions_per_cycle must be >= 1"}, "disabled skips interval check": {func(qp *QueryProtection) { qp.Eviction.Threshold = Threshold{} qp.Eviction.CheckInterval = 0 @@ -154,5 +156,6 @@ func Test_RegisterFlagsWithPrefix_EvictionDefaults(t *testing.T) { assert.Equal(t, 1*time.Second, cfg.Eviction.CheckInterval) assert.Equal(t, 3, cfg.Eviction.CooldownPeriod) assert.Equal(t, "fetched_samples", cfg.Eviction.EvictionMetric) + assert.Equal(t, 1, cfg.Eviction.MaxEvictionsPerCycle) assert.False(t, cfg.Eviction.Enabled()) } diff --git a/pkg/querier/querier_eviction_test.go b/pkg/querier/querier_eviction_test.go index 8e643ba8fd5..338fb0be546 100644 --- a/pkg/querier/querier_eviction_test.go +++ b/pkg/querier/querier_eviction_test.go @@ -85,9 +85,10 @@ func TestQuerier_EvictionIntegration(t *testing.T) { CPUUtilization: tc.evictionCPU, HeapUtilization: tc.evictionHeap, }, - CheckInterval: 1 * time.Second, - CooldownPeriod: 3, - EvictionMetric: "fetched_samples", + CheckInterval: 1 * time.Second, + CooldownPeriod: 3, + EvictionMetric: "fetched_samples", + MaxEvictionsPerCycle: 1, }, } diff --git a/pkg/util/queryeviction/engine_wrapper_test.go b/pkg/util/queryeviction/engine_wrapper_test.go index 884cf1349b0..bfaea17c14d 100644 --- a/pkg/util/queryeviction/engine_wrapper_test.go +++ b/pkg/util/queryeviction/engine_wrapper_test.go @@ -118,9 +118,9 @@ func TestEngineWrapper_EvictedQueryReturnsErrQueryEvicted(t *testing.T) { mq := &mockQuery{ execFunc: func(ctx context.Context) *promql.Result { // Simulate eviction: find the registered query and cancel it. - heaviest := registry.FindHeaviest(0) - require.NotNil(t, heaviest, "query should be registered during Exec") - heaviest.Cancel() // This cancels the child context, simulating evictor behavior. + victims := registry.FindHeaviest(1, 0) + require.Len(t, victims, 1, "query should be registered during Exec") + victims[0].Cancel() // This cancels the child context, simulating evictor behavior. // The inner query would see a cancelled context and return an error. return &promql.Result{Err: context.Canceled} diff --git a/pkg/util/queryeviction/evictor.go b/pkg/util/queryeviction/evictor.go index e7305d01c64..97ad5b653ba 100644 --- a/pkg/util/queryeviction/evictor.go +++ b/pkg/util/queryeviction/evictor.go @@ -82,31 +82,31 @@ func (e *QueryEvictor) running(ctx context.Context) error { continue // no breach } - // Find the heaviest running query. - heaviest := e.registry.FindHeaviest(e.cfg.MinQueryAge) - if heaviest == nil { + // Find the heaviest running queries (up to MaxEvictionsPerCycle). + victims := e.registry.FindHeaviest(e.cfg.MaxEvictionsPerCycle, e.cfg.MinQueryAge) + if len(victims) == 0 { continue // no running queries to evict } - // Evict the heaviest query. - metricValue := e.registry.metric(heaviest.Stats) - heaviest.Cancel() - - // Log the eviction. - level.Warn(e.logger).Log( - "msg", "evicting heaviest query due to resource pressure", - "resource", breachedResource, - "utilization", utilization, - "threshold", threshold, - "request_id", heaviest.RequestID, - "query", heaviest.QueryExpr, - "user", heaviest.UserID, - "metric", e.cfg.EvictionMetric, - "metric_value", metricValue, - ) - - // Increment metrics. - e.evictionsTotal.WithLabelValues(string(breachedResource)).Inc() + // Evict each victim. + for _, victim := range victims { + metricValue := e.registry.metric(victim.Stats) + victim.Cancel() + + level.Warn(e.logger).Log( + "msg", "evicting query due to resource pressure", + "resource", breachedResource, + "utilization", utilization, + "threshold", threshold, + "request_id", victim.RequestID, + "query", victim.QueryExpr, + "user", victim.UserID, + "metric", e.cfg.EvictionMetric, + "metric_value", metricValue, + ) + + e.evictionsTotal.WithLabelValues(string(breachedResource)).Inc() + } // Enter cooldown. cooldownRemaining = e.cfg.CooldownPeriod diff --git a/pkg/util/queryeviction/evictor_test.go b/pkg/util/queryeviction/evictor_test.go index 55de8d94b5d..7b102c641a9 100644 --- a/pkg/util/queryeviction/evictor_test.go +++ b/pkg/util/queryeviction/evictor_test.go @@ -39,9 +39,10 @@ func testEvictorConfig(cpu, heap float64, cooldown int) configs.EvictionConfig { CPUUtilization: cpu, HeapUtilization: heap, }, - CheckInterval: 10 * time.Millisecond, - CooldownPeriod: cooldown, - EvictionMetric: "fetched_samples", + CheckInterval: 10 * time.Millisecond, + CooldownPeriod: cooldown, + EvictionMetric: "fetched_samples", + MaxEvictionsPerCycle: 1, } } @@ -178,8 +179,9 @@ func TestCheckThresholds(t *testing.T) { CPUUtilization: tc.cpuThresh, HeapUtilization: tc.heapThresh, }, - CheckInterval: time.Second, - EvictionMetric: "fetched_samples", + CheckInterval: time.Second, + EvictionMetric: "fetched_samples", + MaxEvictionsPerCycle: 1, } var evictor *QueryEvictor @@ -212,7 +214,7 @@ func TestPrometheusMetrics_IncrementedCorrectly(t *testing.T) { } func TestNewQueryEvictor_ReturnsNilWhenDisabled(t *testing.T) { - cfg := configs.EvictionConfig{CheckInterval: time.Second, EvictionMetric: "fetched_samples"} + cfg := configs.EvictionConfig{CheckInterval: time.Second, EvictionMetric: "fetched_samples", MaxEvictionsPerCycle: 1} evictor, err := NewQueryEvictor(newMockMonitor(0, 0), NewQueryRegistry(testMetricFunc), cfg, log.NewNopLogger(), prometheus.NewPedanticRegistry(), "test") assert.NoError(t, err) assert.Nil(t, evictor) @@ -236,3 +238,29 @@ func TestEviction_HeaviestQueryIsEvicted(t *testing.T) { t.Fatal("expected heaviest query to be evicted") } } + +func TestEviction_MultipleQueriesPerCycle(t *testing.T) { + reg := NewQueryRegistry(testMetricFunc) + _, evictedSmall := registerTestQuery(reg, 100, "small", "user1") + _, evictedLarge := registerTestQuery(reg, 10000, "large", "user2") + _, evictedMedium := registerTestQuery(reg, 500, "medium", "user3") + + cfg := configs.EvictionConfig{ + Threshold: configs.Threshold{ + CPUUtilization: 0.9, + }, + CheckInterval: 10 * time.Millisecond, + CooldownPeriod: 300, + EvictionMetric: "fetched_samples", + MaxEvictionsPerCycle: 2, + } + + startEvictor(t, newMockMonitor(0.95, 0.0), reg, cfg) + + // The two heaviest (large=10000, medium=500) should be evicted in the same cycle. + waitEvicted(t, evictedLarge) + waitEvicted(t, evictedMedium) + + // The smallest should not be evicted because max per cycle is 2 and cooldown prevents another cycle. + assertNotEvicted(t, evictedSmall, 50*time.Millisecond) +} diff --git a/pkg/util/queryeviction/registry.go b/pkg/util/queryeviction/registry.go index 89dd6cc0b62..870486b5aa0 100644 --- a/pkg/util/queryeviction/registry.go +++ b/pkg/util/queryeviction/registry.go @@ -1,6 +1,7 @@ package queryeviction import ( + "container/heap" "context" "fmt" "sync" @@ -80,29 +81,61 @@ func (r *QueryRegistry) Deregister(id uint64) { delete(r.queries, id) } -// FindHeaviest returns the entry with the highest metric value +// FindHeaviest returns up to n entries with the highest metric values // among queries that have been running for at least minAge, -// or nil if no eligible queries exist. -func (r *QueryRegistry) FindHeaviest(minAge time.Duration) *QueryEntry { +// sorted heaviest first. Returns nil if no eligible queries exist. +func (r *QueryRegistry) FindHeaviest(n int, minAge time.Duration) []*QueryEntry { r.mu.RLock() defer r.mu.RUnlock() - var heaviest *QueryEntry - var maxWeight uint64 now := time.Now() + // Use a min-heap of size n to find the top-n queries + h := &weightedHeap{} + for _, entry := range r.queries { if now.Sub(entry.RegisteredAt) < minAge { continue } - weight := r.metric(entry.Stats) - if heaviest == nil || weight > maxWeight { - heaviest = entry - maxWeight = weight + w := weighted{entry: entry, weight: r.metric(entry.Stats)} + + if h.Len() < n { + heap.Push(h, w) + } else if w.weight > (*h)[0].weight { + (*h)[0] = w + heap.Fix(h, 0) } } - return heaviest + if h.Len() == 0 { + return nil + } + + result := make([]*QueryEntry, h.Len()) + for i := len(result) - 1; i >= 0; i-- { + result[i] = heap.Pop(h).(weighted).entry + } + return result +} + +// weightedHeap is a min-heap of weighted entries (smallest weight at root). +type weightedHeap []weighted + +type weighted struct { + entry *QueryEntry + weight uint64 +} + +func (h weightedHeap) Len() int { return len(h) } +func (h weightedHeap) Less(i, j int) bool { return h[i].weight < h[j].weight } +func (h weightedHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *weightedHeap) Push(x interface{}) { *h = append(*h, x.(weighted)) } +func (h *weightedHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[:n-1] + return x } // Len returns the number of currently registered queries. diff --git a/pkg/util/queryeviction/registry_test.go b/pkg/util/queryeviction/registry_test.go index c52181809c4..679df2dee4d 100644 --- a/pkg/util/queryeviction/registry_test.go +++ b/pkg/util/queryeviction/registry_test.go @@ -54,14 +54,14 @@ func TestDeregister_RemovesEntry(t *testing.T) { require.Equal(t, 1, reg.Len()) // FindHeaviest should return the registered entry. - heaviest := reg.FindHeaviest(0) - require.NotNil(t, heaviest) - assert.Equal(t, id, heaviest.QueryID) + results := reg.FindHeaviest(1, 0) + require.Len(t, results, 1) + assert.Equal(t, id, results[0].QueryID) // Deregister and verify it's gone. reg.Deregister(id) assert.Equal(t, 0, reg.Len()) - assert.Nil(t, reg.FindHeaviest(0), "FindHeaviest should return nil after deregistering the only entry") + assert.Nil(t, reg.FindHeaviest(1, 0), "FindHeaviest should return nil after deregistering the only entry") } func TestDeregister_UnknownID_IsNoOp(t *testing.T) { @@ -76,9 +76,9 @@ func TestDeregister_UnknownID_IsNoOp(t *testing.T) { // Original entry should still be present. assert.Equal(t, 1, reg.Len()) - heaviest := reg.FindHeaviest(0) - require.NotNil(t, heaviest) - assert.Equal(t, id, heaviest.QueryID) + results := reg.FindHeaviest(1, 0) + require.Len(t, results, 1) + assert.Equal(t, id, results[0].QueryID) } func TestFindHeaviest_ReturnsHighestMetricValue(t *testing.T) { @@ -91,16 +91,16 @@ func TestFindHeaviest_ReturnsHighestMetricValue(t *testing.T) { heaviestID := reg.Register(cancel, newTestStats(1000), "large-query", "user3", "") reg.Register(cancel, newTestStats(200), "another-query", "user4", "") - heaviest := reg.FindHeaviest(0) - require.NotNil(t, heaviest) - assert.Equal(t, heaviestID, heaviest.QueryID) - assert.Equal(t, "large-query", heaviest.QueryExpr) - assert.Equal(t, uint64(1000), heaviest.Stats.LoadFetchedSamples()) + results := reg.FindHeaviest(1, 0) + require.Len(t, results, 1) + assert.Equal(t, heaviestID, results[0].QueryID) + assert.Equal(t, "large-query", results[0].QueryExpr) + assert.Equal(t, uint64(1000), results[0].Stats.LoadFetchedSamples()) } func TestFindHeaviest_EmptyRegistry(t *testing.T) { reg := NewQueryRegistry(testMetricFunc) - assert.Nil(t, reg.FindHeaviest(0), "FindHeaviest should return nil for empty registry") + assert.Nil(t, reg.FindHeaviest(1, 0), "FindHeaviest should return nil for empty registry") } func TestLen_ReflectsCurrentCount(t *testing.T) { @@ -147,7 +147,7 @@ func TestConcurrent_RegisterDeregisterFindHeaviest(t *testing.T) { id := reg.Register(cancel, stats, "concurrent-query", "user", "") // Interleave FindHeaviest and Len calls. - _ = reg.FindHeaviest(0) + _ = reg.FindHeaviest(1, 0) _ = reg.Len() reg.Deregister(id) From b4b217b5433349ba9303284cb85e80a296d73948 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 11 May 2026 09:33:06 -0700 Subject: [PATCH 10/11] Doc gen Signed-off-by: Essam Eldaly --- docs/blocks-storage/querier.md | 5 +++++ docs/blocks-storage/store-gateway.md | 5 +++++ docs/configuration/config-file-reference.md | 15 +++++++++++++++ schemas/cortex-config-schema.json | 18 ++++++++++++++++++ 4 files changed, 43 insertions(+) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index b633ddbc8fa..ab9abf4ab0c 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -367,6 +367,11 @@ querier: # eligible for eviction. Queries younger than this are ignored. # CLI flag: -querier.query-protection.eviction.min-query-age [min_query_age: | default = 10s] + + # EXPERIMENTAL: Maximum number of queries to evict in a single check cycle + # when resource thresholds are breached. + # CLI flag: -querier.query-protection.eviction.max-evictions-per-cycle + [max_evictions_per_cycle: | default = 1] ``` ### `blocks_storage_config` diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 59250604fa9..01f74e8d00f 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -409,6 +409,11 @@ store_gateway: # CLI flag: -store-gateway.query-protection.eviction.min-query-age [min_query_age: | default = 10s] + # EXPERIMENTAL: Maximum number of queries to evict in a single check cycle + # when resource thresholds are breached. + # CLI flag: -store-gateway.query-protection.eviction.max-evictions-per-cycle + [max_evictions_per_cycle: | default = 1] + hedged_request: # If true, hedged requests are applied to object store calls. It can help # with reducing tail latency. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index e6ca704464a..b48533e3e15 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3909,6 +3909,11 @@ query_protection: # eligible for eviction. Queries younger than this are ignored. # CLI flag: -ingester.query-protection.eviction.min-query-age [min_query_age: | default = 10s] + + # EXPERIMENTAL: Maximum number of queries to evict in a single check cycle + # when resource thresholds are breached. + # CLI flag: -ingester.query-protection.eviction.max-evictions-per-cycle + [max_evictions_per_cycle: | default = 1] ``` ### `ingester_client_config` @@ -5090,6 +5095,11 @@ query_protection: # eligible for eviction. Queries younger than this are ignored. # CLI flag: -querier.query-protection.eviction.min-query-age [min_query_age: | default = 10s] + + # EXPERIMENTAL: Maximum number of queries to evict in a single check cycle + # when resource thresholds are breached. + # CLI flag: -querier.query-protection.eviction.max-evictions-per-cycle + [max_evictions_per_cycle: | default = 1] ``` ### `query_frontend_config` @@ -6896,6 +6906,11 @@ query_protection: # CLI flag: -store-gateway.query-protection.eviction.min-query-age [min_query_age: | default = 10s] + # EXPERIMENTAL: Maximum number of queries to evict in a single check cycle + # when resource thresholds are breached. + # CLI flag: -store-gateway.query-protection.eviction.max-evictions-per-cycle + [max_evictions_per_cycle: | default = 1] + hedged_request: # If true, hedged requests are applied to object store calls. It can help with # reducing tail latency. diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 63d7339902e..da9bddb2b4e 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -4902,6 +4902,12 @@ "type": "string", "x-cli-flag": "ingester.query-protection.eviction.eviction-metric" }, + "max_evictions_per_cycle": { + "default": 1, + "description": "EXPERIMENTAL: Maximum number of queries to evict in a single check cycle when resource thresholds are breached.", + "type": "number", + "x-cli-flag": "ingester.query-protection.eviction.max-evictions-per-cycle" + }, "min_query_age": { "default": "10s", "description": "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.", @@ -6113,6 +6119,12 @@ "type": "string", "x-cli-flag": "querier.query-protection.eviction.eviction-metric" }, + "max_evictions_per_cycle": { + "default": 1, + "description": "EXPERIMENTAL: Maximum number of queries to evict in a single check cycle when resource thresholds are breached.", + "type": "number", + "x-cli-flag": "querier.query-protection.eviction.max-evictions-per-cycle" + }, "min_query_age": { "default": "10s", "description": "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.", @@ -8527,6 +8539,12 @@ "type": "string", "x-cli-flag": "store-gateway.query-protection.eviction.eviction-metric" }, + "max_evictions_per_cycle": { + "default": 1, + "description": "EXPERIMENTAL: Maximum number of queries to evict in a single check cycle when resource thresholds are breached.", + "type": "number", + "x-cli-flag": "store-gateway.query-protection.eviction.max-evictions-per-cycle" + }, "min_query_age": { "default": "10s", "description": "EXPERIMENTAL: Minimum time a query must be running before it becomes eligible for eviction. Queries younger than this are ignored.", From 5a3e742826dbd17e1ffdc88cea3794ae61dfbec3 Mon Sep 17 00:00:00 2001 From: Essam Eldaly Date: Mon, 11 May 2026 11:53:06 -0700 Subject: [PATCH 11/11] lint Signed-off-by: Essam Eldaly --- pkg/util/queryeviction/registry.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/util/queryeviction/registry.go b/pkg/util/queryeviction/registry.go index 870486b5aa0..0b499f0b04e 100644 --- a/pkg/util/queryeviction/registry.go +++ b/pkg/util/queryeviction/registry.go @@ -126,11 +126,11 @@ type weighted struct { weight uint64 } -func (h weightedHeap) Len() int { return len(h) } -func (h weightedHeap) Less(i, j int) bool { return h[i].weight < h[j].weight } -func (h weightedHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h *weightedHeap) Push(x interface{}) { *h = append(*h, x.(weighted)) } -func (h *weightedHeap) Pop() interface{} { +func (h weightedHeap) Len() int { return len(h) } +func (h weightedHeap) Less(i, j int) bool { return h[i].weight < h[j].weight } +func (h weightedHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *weightedHeap) Push(x any) { *h = append(*h, x.(weighted)) } +func (h *weightedHeap) Pop() any { old := *h n := len(old) x := old[n-1]