Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/cobaltcore-dev/cortex/internal/scheduling/machines"
"github.com/cobaltcore-dev/cortex/internal/scheduling/manila"
"github.com/cobaltcore-dev/cortex/internal/scheduling/nova"
novafilters "github.com/cobaltcore-dev/cortex/internal/scheduling/nova/plugins/filters"
"github.com/cobaltcore-dev/cortex/internal/scheduling/pods"
"github.com/cobaltcore-dev/cortex/internal/scheduling/reservations"
"github.com/cobaltcore-dev/cortex/internal/scheduling/reservations/capacity"
Expand Down Expand Up @@ -385,6 +386,12 @@ func main() {
detectorPipelineMonitor := schedulinglib.NewDetectorPipelineMonitor()
metrics.Registry.MustRegister(&detectorPipelineMonitor)

// Filter-specific metrics that don't fit the generic per-step monitor (e.g.
// custom labels). Register them globally so they're available wherever the
// filter runs.
novafilters.QuotaEnforcementMetricsSingleton = novafilters.NewQuotaEnforcementMetrics()
metrics.Registry.MustRegister(novafilters.QuotaEnforcementMetricsSingleton)

// Initialize commitments API for LIQUID interface (Postgres-backed usage reporting).
commitmentsConfig := conf.GetConfigOrDie[commitments.Config]()
var commitmentsUsageDB commitments.UsageDBClient
Expand Down
10 changes: 10 additions & 0 deletions helm/bundles/cortex-nova/templates/pipelines_kvm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ spec:
If a matching CommittedResource has unused capacity, the request is accepted.
Otherwise, PAYG headroom is checked for ram, cores, and instances.
Rejects all hosts if neither tier has headroom.
When dryRun is true the filter runs in shadow mode: it logs and emits
the cortex_nova_filter_quota_enforcement_decisions_total metric for
would-be rejects but never actually removes hosts.
params:
- {key: dryRun, boolValue: true}
weighers:
- name: kvm_prefer_smaller_hosts
params:
Expand Down Expand Up @@ -261,6 +266,11 @@ spec:
If a matching CommittedResource has unused capacity, the request is accepted.
Otherwise, PAYG headroom is checked for ram, cores, and instances.
Rejects all hosts if neither tier has headroom.
When dryRun is true the filter runs in shadow mode: it logs and emits
the cortex_nova_filter_quota_enforcement_decisions_total metric for
would-be rejects but never actually removes hosts.
params:
- {key: dryRun, boolValue: true}
weighers:
- name: kvm_prefer_smaller_hosts
params:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ import (
"k8s.io/apimachinery/pkg/types"
)

type FilterQuotaEnforcementOpts struct{}
type FilterQuotaEnforcementOpts struct {
// DryRun, when true, makes the filter run in shadow mode: it performs the
// full headroom analysis and logs/emits metrics for would-be rejects, but
// never actually removes hosts from the result. Use this for safe rollouts
// and to observe cortex_nova_filter_quota_enforcement_decisions_total in
// shadow mode before flipping to enforce.
DryRun bool `json:"dryRun,omitempty"`
}

func (FilterQuotaEnforcementOpts) Validate() error { return nil }

Expand All @@ -44,6 +51,17 @@ func (FilterQuotaEnforcementOpts) Validate() error { return nil }
// On infrastructure errors (e.g. API server unreachable), the filter returns an error
// which causes the pipeline to skip it (fail-open).
//
// Two modes:
// - DryRun=false (enforce, default zero value): on a no-headroom decision the filter
// clears all host activations to globally reject the request.
// - DryRun=true (shadow): the filter performs the same analysis, logs the would-be
// reject, and emits the same decision metric with mode="shadow" — but does NOT
// remove hosts. Use this to observe rejection volumes before enabling enforcement.
//
// Every accept/reject/skip outcome is recorded as a Prometheus counter:
// cortex_nova_filter_quota_enforcement_decisions_total{mode,decision,resource,
// availability_zone,flavor_group}.
//
// Disabled by default; activated by adding "filter_quota_enforcement" to the pipeline config.
type FilterQuotaEnforcement struct {
lib.BaseFilter[api.ExternalSchedulerRequest, FilterQuotaEnforcementOpts]
Expand All @@ -52,19 +70,27 @@ type FilterQuotaEnforcement struct {
func (s *FilterQuotaEnforcement) Run(traceLog *slog.Logger, request api.ExternalSchedulerRequest) (*lib.FilterWeigherPipelineStepResult, error) {
result := s.IncludeAllHostsFromRequest(request)

mode := "enforce"
if s.Options.DryRun {
mode = "shadow"
}

// Step 1: Skip intents that don't represent new resource consumption.
intent, err := request.GetIntent()
if err == nil {
switch intent {
case api.EvacuateIntent, api.LiveMigrationIntent:
traceLog.Info("skipping quota enforcement for non-consuming intent", "intent", intent)
QuotaEnforcementMetricsSingleton.RecordDecision(mode, "accept_skipped", "", "", "")
return result, nil
case api.ReserveForFailoverIntent:
traceLog.Info("skipping quota enforcement for failover reservation intent")
QuotaEnforcementMetricsSingleton.RecordDecision(mode, "accept_skipped", "", "", "")
return result, nil
case api.ReserveForCommittedResourceIntent:
// TODO: revisit whether committed resource reservation scheduling should also be quota-checked
traceLog.Info("skipping quota enforcement for committed resource reservation intent")
QuotaEnforcementMetricsSingleton.RecordDecision(mode, "accept_skipped", "", "", "")
return result, nil
}
}
Expand All @@ -76,14 +102,17 @@ func (s *FilterQuotaEnforcement) Run(traceLog *slog.Logger, request api.External

if projectID == "" {
traceLog.Warn("no project ID in request, skipping quota enforcement")
QuotaEnforcementMetricsSingleton.RecordDecision(mode, "accept_skipped", "", az, hwVersion)
return result, nil
}
if az == "" {
traceLog.Warn("no availability zone in request, skipping quota enforcement")
QuotaEnforcementMetricsSingleton.RecordDecision(mode, "accept_skipped", "", "", hwVersion)
return result, nil
}
if hwVersion == "" {
traceLog.Warn("no hw_version in flavor extra specs, skipping quota enforcement")
QuotaEnforcementMetricsSingleton.RecordDecision(mode, "accept_skipped", "", az, "")
return result, nil
}

Expand Down Expand Up @@ -188,6 +217,7 @@ func (s *FilterQuotaEnforcement) Run(traceLog *slog.Logger, request api.External

if crHasHeadroom {
traceLog.Info("quota enforcement ACCEPT: CR headroom available")
QuotaEnforcementMetricsSingleton.RecordDecision(mode, "accept_cr", "ram", az, hwVersion)
return result, nil
}

Expand All @@ -200,6 +230,7 @@ func (s *FilterQuotaEnforcement) Run(traceLog *slog.Logger, request api.External
if apierrors.IsNotFound(err) {
traceLog.Info("no ProjectQuota CRD found for project+AZ, skipping enforcement",
"projectID", projectID, "az", az)
QuotaEnforcementMetricsSingleton.RecordDecision(mode, "accept_no_quota", "", az, hwVersion)
return result, nil
}
traceLog.Error("failed to get ProjectQuota", "name", pqName, "error", err)
Expand All @@ -211,14 +242,15 @@ func (s *FilterQuotaEnforcement) Run(traceLog *slog.Logger, request api.External
// For RAM: paygHeadroom = Quota - sum(CR amounts) - PaygUsage (CRs reserve RAM capacity)
// For cores/instances: paygHeadroom = Quota - PaygUsage (no CR deduction)
type resourceCheck struct {
name string
name string // liquid resource name, e.g. hw_version_<v>_ram
label string // metric label value: "ram" / "cores" / "instances"
request int64
crDeduct int64
}
checks := []resourceCheck{
{name: resourceRAM, request: requestRAM, crDeduct: matchingCRAmountGiB},
{name: resourceCores, request: requestCores, crDeduct: 0},
{name: resourceInstances, request: requestInstances, crDeduct: 0},
{name: resourceRAM, label: "ram", request: requestRAM, crDeduct: matchingCRAmountGiB},
{name: resourceCores, label: "cores", request: requestCores, crDeduct: 0},
{name: resourceInstances, label: "instances", request: requestInstances, crDeduct: 0},
}

for _, check := range checks {
Expand Down Expand Up @@ -247,9 +279,22 @@ func (s *FilterQuotaEnforcement) Run(traceLog *slog.Logger, request api.External
)

if headroom < check.request {
QuotaEnforcementMetricsSingleton.RecordDecision(mode, "reject", check.label, az, hwVersion)
if s.Options.DryRun {
traceLog.Info("quota enforcement SHADOW: would reject but dryRun=true",
"projectID", projectID,
"az", az,
"flavorGroup", hwVersion,
"resource", check.name,
"request", check.request,
"headroom", headroom,
)
return result, nil
}
traceLog.Info("quota enforcement REJECT: no PAYG headroom",
"projectID", projectID,
"az", az,
"flavorGroup", hwVersion,
"resource", check.name,
"request", check.request,
"headroom", headroom,
Expand All @@ -262,6 +307,7 @@ func (s *FilterQuotaEnforcement) Run(traceLog *slog.Logger, request api.External
}

traceLog.Info("quota enforcement ACCEPT: PAYG headroom available")
QuotaEnforcementMetricsSingleton.RecordDecision(mode, "accept_payg", "", az, hwVersion)
return result, nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package filters

import (
"log/slog"
"sync"

"github.com/prometheus/client_golang/prometheus"
)

// QuotaEnforcementMetrics holds Prometheus metrics for the quota-enforcement
// nova filter. The filter runs in either "enforce" mode (rejects when no
// headroom) or "shadow" mode (logs/counts what it would have rejected without
// removing hosts). The decisions counter exposes the same outcomes in both
// modes so operators can compare a shadow rollout against the existing
// generic step metrics (e.g. cortex_filter_weigher_pipeline_step_removed_hosts).
type QuotaEnforcementMetrics struct {
// Decisions is the counter of every accept/reject/skip outcome the filter
// produces. Labels:
// - mode: "enforce" | "shadow"
// - decision: "accept_cr" | "accept_payg" | "accept_no_quota" |
// "accept_skipped" | "reject"
// - resource: "ram" | "cores" | "instances" | "" (empty when
// the decision is not driven by a single resource)
// - availability_zone: AZ string or "" if unknown at decision time
// - flavor_group: hw_version string or "" if unknown at decision time
Decisions *prometheus.CounterVec
}

// NewQuotaEnforcementMetrics constructs the metrics struct. The returned
// *QuotaEnforcementMetrics implements prometheus.Collector, so the caller is
// expected to register it with a registry (typically metrics.Registry from
// cmd/manager). This matches the Monitor pattern used elsewhere in cortex
// (e.g. db.Monitor, LogMetricsMonitor, PipelineMonitor).
func NewQuotaEnforcementMetrics() *QuotaEnforcementMetrics {
return &QuotaEnforcementMetrics{
Decisions: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cortex_nova_filter_quota_enforcement_decisions_total",
Help: "Decisions made by the FilterQuotaEnforcement nova filter, " +
"labeled by enforcement mode, decision outcome, the resource " +
"that drove a reject (if any), availability zone, and flavor group.",
},
[]string{"mode", "decision", "resource", "availability_zone", "flavor_group"},
),
}
}

// Describe implements prometheus.Collector by delegating to the underlying
// counter vec. Nil-safe to mirror RecordDecision's defensive guard: a stray
// MustRegister on an uninitialized struct must not crash the manager.
func (m *QuotaEnforcementMetrics) Describe(ch chan<- *prometheus.Desc) {
if m == nil || m.Decisions == nil {
return
}
m.Decisions.Describe(ch)
}

// Collect implements prometheus.Collector by delegating to the underlying
// counter vec. Nil-safe — Collect runs on every Prometheus scrape, so a nil
// receiver must not panic the metrics endpoint.
func (m *QuotaEnforcementMetrics) Collect(ch chan<- prometheus.Metric) {
if m == nil || m.Decisions == nil {
return
}
m.Decisions.Collect(ch)
}

// recordDecisionNilOnce ensures the "metrics not initialized" warning is
// emitted at most once per process to keep logs clean while still surfacing
// misconfigurations. A pointer is used so tests can swap in a freshly armed
// once without copying a sync.Once value (which is forbidden after first use).
var recordDecisionNilOnce = &sync.Once{}

// RecordDecision is a small helper that nil-guards the singleton and increments
// the decisions counter. Filter Run() uses it directly. If the receiver is nil
// (i.e. QuotaEnforcementMetricsSingleton was never initialized — typically a
// missing wiring step in cmd/manager) we log a warn-level message exactly once
// so the misconfiguration is observable both in logs and via
// cortex_log_messages_total{level="warn"}.
func (m *QuotaEnforcementMetrics) RecordDecision(mode, decision, resource, az, flavorGroup string) {
if m == nil || m.Decisions == nil {
recordDecisionNilOnce.Do(func() {
slog.Warn("QuotaEnforcementMetrics is nil; decision metric not recorded "+
"(is QuotaEnforcementMetricsSingleton initialized in cmd/manager?)",
"mode", mode,
"decision", decision,
"resource", resource,
"availability_zone", az,
"flavor_group", flavorGroup,
)
})
return
}
m.Decisions.WithLabelValues(mode, decision, resource, az, flavorGroup).Inc()
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// QuotaEnforcementMetricsSingleton is set from cmd/manager/main.go during
// initialization. The filter's Run method reads it via RecordDecision, which
// nil-guards itself, so unit tests that construct a bare FilterQuotaEnforcement
// without setting this singleton stay safe.
var QuotaEnforcementMetricsSingleton *QuotaEnforcementMetrics
Loading