diff --git a/cmd/duckgres-controlplane/main.go b/cmd/duckgres-controlplane/main.go index a1ce77de..8cdac03d 100644 --- a/cmd/duckgres-controlplane/main.go +++ b/cmd/duckgres-controlplane/main.go @@ -236,6 +236,7 @@ func main() { WorkerTolerationValue: resolved.K8sWorkerTolerationValue, AllowClientWorkerProfile: resolved.K8sAllowClientWorkerProfile, WorkerPriorityClassName: resolved.K8sWorkerPriorityClassName, + HeadroomNodes: resolved.K8sHeadroomNodes, HeadroomPercent: resolved.K8sHeadroomPercent, PlaceholderImage: resolved.K8sPlaceholderImage, PlaceholderPriorityClassName: resolved.K8sPlaceholderPriorityClassName, diff --git a/configresolve/resolve.go b/configresolve/resolve.go index 241aeef0..e62f04b3 100644 --- a/configresolve/resolve.go +++ b/configresolve/resolve.go @@ -114,6 +114,7 @@ type Resolved struct { K8sWorkerTolerationValue string K8sAllowClientWorkerProfile bool K8sWorkerPriorityClassName string + K8sHeadroomNodes int K8sHeadroomPercent int K8sPlaceholderImage string K8sPlaceholderPriorityClassName string @@ -204,6 +205,7 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu var k8sWorkerProfileMinCPU, k8sWorkerProfileMaxCPU, k8sWorkerProfileMinMemory, k8sWorkerProfileMaxMemory string var k8sWorkerMaxTTL, k8sWorkerDefaultTTL time.Duration var k8sWorkerPriorityClassName string + var k8sHeadroomNodes int var k8sHeadroomPercent int var k8sPlaceholderImage, k8sPlaceholderPriorityClassName string var k8sWorkerImage, k8sWorkerNamespace, k8sControlPlaneID string @@ -907,6 +909,11 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu if v := getenv("DUCKGRES_K8S_WORKER_PRIORITY_CLASS"); v != "" { k8sWorkerPriorityClassName = v } + if v := getenv("DUCKGRES_K8S_HEADROOM_NODES"); v != "" { + if n, err := strconv.Atoi(v); err == nil { + k8sHeadroomNodes = n + } + } if v := getenv("DUCKGRES_K8S_HEADROOM_PERCENT"); v != "" { if n, err := strconv.Atoi(v); err == nil { k8sHeadroomPercent = n @@ -1277,6 +1284,7 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu K8sWorkerTolerationValue: k8sWorkerTolerationValue, K8sAllowClientWorkerProfile: k8sAllowClientWorkerProfile, K8sWorkerPriorityClassName: k8sWorkerPriorityClassName, + K8sHeadroomNodes: k8sHeadroomNodes, K8sHeadroomPercent: k8sHeadroomPercent, K8sPlaceholderImage: k8sPlaceholderImage, K8sPlaceholderPriorityClassName: k8sPlaceholderPriorityClassName, diff --git a/controlplane/configstore/lifecycle_store.go b/controlplane/configstore/lifecycle_store.go index aef27e05..bcd5b94a 100644 --- a/controlplane/configstore/lifecycle_store.go +++ b/controlplane/configstore/lifecycle_store.go @@ -41,6 +41,16 @@ func (cs *ConfigStore) ListExpiredHotIdleSnapshots(now time.Time, defaultTTL tim return workerSnapshotsFromRecords(records), nil } +// ListExpiredHotIdleSnapshotsForCP is the snapshot-typed, owner-scoped variant +// of ListExpiredHotIdleWorkers used by the per-CP fallback reaper. +func (cs *ConfigStore) ListExpiredHotIdleSnapshotsForCP(ownerCPInstanceID string, now time.Time, defaultTTL time.Duration) ([]WorkerSnapshot, error) { + records, err := cs.ListExpiredHotIdleWorkersForCP(ownerCPInstanceID, now, defaultTTL) + if err != nil { + return nil, err + } + return workerSnapshotsFromRecords(records), nil +} + // ListStuckWorkerSnapshots is the snapshot-typed variant of // ListStuckWorkers used by the janitor's stuck-spawn/activate reaper. func (cs *ConfigStore) ListStuckWorkerSnapshots(spawningBefore, activatingBefore time.Time) ([]WorkerSnapshot, error) { diff --git a/controlplane/configstore/store.go b/controlplane/configstore/store.go index 4c3c8148..32cf9ced 100644 --- a/controlplane/configstore/store.go +++ b/controlplane/configstore/store.go @@ -993,6 +993,29 @@ func (cs *ConfigStore) ListExpiredHotIdleWorkers(now time.Time, defaultTTL time. return workers, nil } +// ListExpiredHotIdleWorkersForCP is the owner-scoped variant of +// ListExpiredHotIdleWorkers: it returns only expired hot-idle workers owned by +// ownerCPInstanceID. Used by the per-CP fallback reaper, which runs on every +// replica independent of the janitor leader lease so hot-idle workers are still +// retired (and their nodes reclaimed) even if leadership is wedged/absent. Each +// replica only reaps the workers it owns; the leader janitor still handles +// cross-CP/orphaned rows. +func (cs *ConfigStore) ListExpiredHotIdleWorkersForCP(ownerCPInstanceID string, now time.Time, defaultTTL time.Duration) ([]WorkerRecord, error) { + defMins := int64(defaultTTL.Minutes()) + if defMins < 0 { + defMins = 0 + } + var workers []WorkerRecord + err := cs.db.Table(cs.runtimeTable((&WorkerRecord{}).TableName())). + Where("state = ? AND owner_cp_instance_id = ? AND COALESCE(hot_idle_since, updated_at) + (CASE WHEN COALESCE(ttl_minutes, 0) > 0 THEN ttl_minutes ELSE ? END) * interval '1 minute' <= ?", + WorkerStateHotIdle, ownerCPInstanceID, defMins, now). + Find(&workers).Error + if err != nil { + return nil, fmt.Errorf("list expired hot-idle workers for cp: %w", err) + } + return workers, nil +} + // CountHotIdleWorkers returns the number of compatible hot-idle workers for an // org/profile/image bucket. Empty profile CPU/memory is the default profile. func (cs *ConfigStore) CountHotIdleWorkers(orgID, image, profileCPU, profileMemory string) (int, error) { diff --git a/controlplane/control.go b/controlplane/control.go index 1a30617c..04c50b3a 100644 --- a/controlplane/control.go +++ b/controlplane/control.go @@ -143,11 +143,14 @@ type K8sConfig struct { WorkerPriorityClassName string // PriorityClass for worker pods, so they preempt overprovision headroom pause pods (empty = none) AWSRegion string // AWS region for STS client - // Node-headroom controller: keep HeadroomPercent% of the worker nodepool's - // allocatable CPU+memory free via low-priority placeholder pods, so a worker - // spawn schedules immediately (preempting placeholders) rather than waiting - // on a fresh Karpenter node. 0 = disabled. - HeadroomPercent int // % of worker-nodepool allocatable to hold free (0 = disabled) + // Node-headroom controller holds preemptible low-priority placeholder pods so + // a worker spawn schedules immediately (preempting a placeholder) rather than + // waiting on a fresh Karpenter node. HeadroomNodes>0 selects the constant + // mode (a fixed number of node-sized placeholders, demand-independent — the + // prod default); HeadroomPercent is the legacy demand-proportional fallback. + // Both 0 = disabled. + HeadroomNodes int // CONSTANT node-headroom: number of node-sized placeholder pods (0 = use HeadroomPercent) + HeadroomPercent int // legacy demand-% headroom, used only when HeadroomNodes==0 (0 = disabled) PlaceholderImage string // Image for placeholder pods (a pause image) PlaceholderPriorityClassName string // PriorityClass for placeholder pods — MUST rank below WorkerPriorityClassName @@ -166,7 +169,7 @@ type K8sConfig struct { // "default TTL", pairing with WorkerMaxTTL (the clamp). It governs both // no-ttl paths the same way: default-shape workers (reaped by the janitor) // and sized-but-no-ttl workers (stamped at profile resolution). Per-request - // precedence: client GUC > org default > this > built-in (20m, + // precedence: client GUC > org default > this > built-in (1m, // defaultWorkerTTL). Raise it above a tenant's job cadence (e.g. 70m for // hourly jobs) so scheduled workloads reuse hot-idle workers instead of // cold-spawning every run — at the cost of idle worker nodes. diff --git a/controlplane/headroom.go b/controlplane/headroom.go index bbb18515..2f4df796 100644 --- a/controlplane/headroom.go +++ b/controlplane/headroom.go @@ -16,6 +16,19 @@ import ( // headroomPodLabel marks the low-priority placeholder pods this controller owns. const headroomPodLabel = "duckgres-headroom" +// Node-sized placeholder shape for CONSTANT node-headroom (HeadroomNodes>0): +// one placeholder ≈ one worker node, so N placeholders reserve N preemptible +// nodes' worth of warm capacity regardless of current demand. Sized to an r6gd +// worker node's schedulable capacity (matches the historical exclusive-worker +// shape). A real worker (any shape) preempts one placeholder to claim a node; +// the next reconcile recreates it, which makes Karpenter add a node in the +// background. Kept as a constant — the count is the only knob — per the +// "keep headroom small and constant" decision. +const ( + headroomNodeCPU = "46" + headroomNodeMemory = "360Gi" +) + // headroomPlaceholdersNeeded returns how many placeholder pods of size // (phCPUMillis, phMemBytes) are required to hold `percent`% of the CURRENT // WORKER DEMAND (workerCPUMillis/workerMemBytes — the summed resource requests @@ -69,39 +82,31 @@ func ceilDiv(a, b int64) int { return int((a + b - 1) / b) } -// reconcileHeadroom drives the number of placeholder pods toward the configured -// percentage of CURRENT WORKER DEMAND (summed live worker pod requests). -// Leader-gated (called from the janitor, which runs only on the elected CP). -// A no-op when headroomPercent<=0. +// reconcileHeadroom drives the number of low-priority placeholder pods toward +// the headroom target. Leader-gated (called from the janitor, which runs only +// on the elected CP). Always runs (even when headroom is disabled) so that a +// disabled/just-disabled pool converges to ZERO placeholders instead of +// stranding the ones it created — there is no other path that deletes them. // -// Placeholders are sized to the pool's DEFAULT WORKER SHAPE (WorkerCPURequest/ -// WorkerMemoryRequest, else the built-in defaults) — no separate sizing knob: -// one preempted placeholder always frees exactly one default-worker slot, in -// every environment. They carry a PriorityClass BELOW the worker -// PriorityClass, so the scheduler preempts them to fit a real worker; a worker -// larger than one placeholder preempts AS MANY as it needs (standard k8s -// priority preemption). The reconcile then recreates placeholders back toward +// Two modes: +// - CONSTANT (HeadroomNodes>0, the prod default): keep exactly HeadroomNodes +// node-sized placeholders, INDEPENDENT of worker demand. This is what +// bounds headroom cost: the previous demand-proportional mode counted +// hot_idle (idle-but-alive) workers as demand and provisioned an extra +// headroomPercent% of full-worker-sized placeholders, so it AMPLIFIED any +// idle-worker leak ~1.75x. A fixed node count cannot do that. +// - LEGACY percent (HeadroomPercent>0, HeadroomNodes==0): demand-proportional +// sizing, retained for small/dev pools. +// +// Placeholders carry a PriorityClass BELOW the worker PriorityClass, so the +// scheduler preempts them to fit a real worker; a worker larger than one +// placeholder preempts as many as it needs. The reconcile recreates them toward // the target on the next tick, which makes Karpenter add node capacity in the -// background. So headroom self-heals after any size of spawn. +// background, so headroom self-heals after any spawn. func (p *K8sWorkerPool) reconcileHeadroom(ctx context.Context) { - if p.headroomPercent <= 0 { - return - } - phCPU, err := resource.ParseQuantity(firstNonEmpty(p.workerCPURequest, defaultWorkerCPU)) - if err != nil { - slog.Warn("Headroom: invalid default worker cpu for placeholder sizing.", "value", p.workerCPURequest, "error", err) - return - } - phMem, err := resource.ParseQuantity(firstNonEmpty(p.workerMemoryRequest, defaultWorkerMemory)) - if err != nil { - slog.Warn("Headroom: invalid default worker memory for placeholder sizing.", "value", p.workerMemoryRequest, "error", err) - return - } - - workerCPU, workerMem, err := p.activeWorkerDemand(ctx) - if err != nil { - slog.Warn("Headroom: failed to sum worker demand.", "error", err) - return + desired, phCPU, phMem, ok := p.headroomTarget(ctx) + if !ok { + return // sizing/demand error already logged; back off, retry next tick } existing, _, err := p.listPlaceholderPods(ctx) @@ -110,8 +115,6 @@ func (p *K8sWorkerPool) reconcileHeadroom(ctx context.Context) { return } - desired := headroomPlaceholdersNeeded(workerCPU, workerMem, phCPU.MilliValue(), phMem.Value(), p.headroomPercent) - switch { case len(existing) < desired: for i := len(existing); i < desired; i++ { @@ -120,15 +123,58 @@ func (p *K8sWorkerPool) reconcileHeadroom(ctx context.Context) { return // back off; next tick retries } } - slog.Info("Headroom: scaled placeholder pods up.", "from", len(existing), "to", desired, "percent", p.headroomPercent) + slog.Info("Headroom: scaled placeholder pods up.", "from", len(existing), "to", desired) case len(existing) > desired: // Delete the newest placeholders first (stable order by name). sort.Slice(existing, func(i, j int) bool { return existing[i] > existing[j] }) for i := 0; i < len(existing)-desired; i++ { _ = p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, existing[i], metav1.DeleteOptions{GracePeriodSeconds: int64Ptr(0)}) } - slog.Info("Headroom: scaled placeholder pods down.", "from", len(existing), "to", desired, "percent", p.headroomPercent) + slog.Info("Headroom: scaled placeholder pods down.", "from", len(existing), "to", desired) + } +} + +// headroomTarget resolves the desired placeholder count and per-placeholder +// size for this tick. ok=false means a transient sizing/demand error (already +// logged) and the caller should skip this tick. A desired of 0 with ok=true is +// the disabled state — the caller still lists and deletes any existing +// placeholders so they don't leak. +func (p *K8sWorkerPool) headroomTarget(ctx context.Context) (desired int, phCPU, phMem resource.Quantity, ok bool) { + if p.headroomNodes > 0 { + cpu, err := resource.ParseQuantity(headroomNodeCPU) + if err != nil { + slog.Warn("Headroom: invalid node cpu for placeholder sizing.", "value", headroomNodeCPU, "error", err) + return 0, resource.Quantity{}, resource.Quantity{}, false + } + mem, err := resource.ParseQuantity(headroomNodeMemory) + if err != nil { + slog.Warn("Headroom: invalid node memory for placeholder sizing.", "value", headroomNodeMemory, "error", err) + return 0, resource.Quantity{}, resource.Quantity{}, false + } + return p.headroomNodes, cpu, mem, true } + + if p.headroomPercent > 0 { + cpu, err := resource.ParseQuantity(firstNonEmpty(p.workerCPURequest, defaultWorkerCPU)) + if err != nil { + slog.Warn("Headroom: invalid default worker cpu for placeholder sizing.", "value", p.workerCPURequest, "error", err) + return 0, resource.Quantity{}, resource.Quantity{}, false + } + mem, err := resource.ParseQuantity(firstNonEmpty(p.workerMemoryRequest, defaultWorkerMemory)) + if err != nil { + slog.Warn("Headroom: invalid default worker memory for placeholder sizing.", "value", p.workerMemoryRequest, "error", err) + return 0, resource.Quantity{}, resource.Quantity{}, false + } + workerCPU, workerMem, err := p.activeWorkerDemand(ctx) + if err != nil { + slog.Warn("Headroom: failed to sum worker demand.", "error", err) + return 0, resource.Quantity{}, resource.Quantity{}, false + } + return headroomPlaceholdersNeeded(workerCPU, workerMem, cpu.MilliValue(), mem.Value(), p.headroomPercent), cpu, mem, true + } + + // Disabled: converge to zero placeholders. + return 0, resource.Quantity{}, resource.Quantity{}, true } // activeWorkerDemand sums the resource REQUESTS of live (non-terminating) diff --git a/controlplane/headroom_test.go b/controlplane/headroom_test.go index 94014788..13b350cb 100644 --- a/controlplane/headroom_test.go +++ b/controlplane/headroom_test.go @@ -4,14 +4,32 @@ package controlplane import ( "context" + "fmt" "testing" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" ) +// emulateGenerateName makes the fake clientset assign unique names from +// metav1.GenerateName on pod create, as the real API server does (the fake +// otherwise leaves Name empty, so multiple GenerateName creates collide). +func emulateGenerateName(cs *fake.Clientset) { + var n int + cs.PrependReactor("create", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) { + pod, ok := action.(k8stesting.CreateAction).GetObject().(*corev1.Pod) + if ok && pod.Name == "" && pod.GenerateName != "" { + n++ + pod.Name = fmt.Sprintf("%s%d", pod.GenerateName, n) + } + return false, nil, nil // fall through to the default tracker with the mutated object + }) +} + func TestHeadroomPlaceholdersNeeded(t *testing.T) { const ( gi = 1 << 30 @@ -81,6 +99,70 @@ func TestPlaceholderPodGenerateName(t *testing.T) { } } +// Constant node-headroom keeps a FIXED number of node-sized placeholders and +// does NOT scale with worker demand — the property that prevents an idle-worker +// leak from being amplified into runaway placeholder capacity. +func TestReconcileHeadroomConstantNodesIgnoresDemand(t *testing.T) { + worker := map[string]string{"app": "duckgres-worker"} + mkWorker := func(name string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "default", Labels: worker}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{ + Name: "duckdb-worker", + Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("64Gi"), + }}, + }}}, + } + } + // A large worker population (as a hot_idle leak would produce) must NOT + // inflate the placeholder count under constant mode. + objs := []runtime.Object{} + for i := 0; i < 50; i++ { + objs = append(objs, mkWorker(fmt.Sprintf("w%d", i))) + } + cs := fake.NewSimpleClientset(objs...) + emulateGenerateName(cs) + p := &K8sWorkerPool{clientset: cs, namespace: "default", cpID: "duckgres-cp-abcde", headroomNodes: 3} + + p.reconcileHeadroom(context.Background()) + + pods, err := cs.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{LabelSelector: "app=" + headroomPodLabel}) + if err != nil { + t.Fatalf("list placeholders: %v", err) + } + if len(pods.Items) != 3 { + t.Fatalf("constant headroom: expected 3 placeholders regardless of 50 workers, got %d", len(pods.Items)) + } + req := pods.Items[0].Spec.Containers[0].Resources.Requests + if req.Cpu().String() != headroomNodeCPU || req.Memory().String() != headroomNodeMemory { + t.Fatalf("expected node-sized placeholder %s/%s, got %s/%s", headroomNodeCPU, headroomNodeMemory, req.Cpu(), req.Memory()) + } +} + +// Disabling headroom (both knobs 0) must converge existing placeholders to zero, +// not strand them — there is no other path that deletes placeholder pods. +func TestReconcileHeadroomDisabledDeletesPlaceholders(t *testing.T) { + mkPlaceholder := func(name string) *corev1.Pod { + return &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ + Name: name, Namespace: "default", Labels: map[string]string{"app": headroomPodLabel}, + }} + } + cs := fake.NewSimpleClientset(mkPlaceholder("duckgres-placeholder-x-1"), mkPlaceholder("duckgres-placeholder-x-2")) + p := &K8sWorkerPool{clientset: cs, namespace: "default", headroomNodes: 0, headroomPercent: 0} + + p.reconcileHeadroom(context.Background()) + + pods, err := cs.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{LabelSelector: "app=" + headroomPodLabel}) + if err != nil { + t.Fatalf("list placeholders: %v", err) + } + if len(pods.Items) != 0 { + t.Fatalf("expected disabled headroom to delete all placeholders, got %d", len(pods.Items)) + } +} + func TestActiveWorkerDemand(t *testing.T) { // Sums live worker pod requests; excludes terminating/terminal pods and // non-worker pods (placeholders must never feed the demand baseline). diff --git a/controlplane/janitor.go b/controlplane/janitor.go index 633ff4c8..9b254c03 100644 --- a/controlplane/janitor.go +++ b/controlplane/janitor.go @@ -40,7 +40,7 @@ type ControlPlaneJanitor struct { onStop func() retireMismatchedVersionWorker func() // reaps one idle worker whose Deployment version differs from this CP's (leader-only) cleanupOrphanedWorkerPods func() // deletes K8s worker pods whose DB row is terminal (retired/lost) or missing (leader-only) - reconcileHeadroom func() // maintains low-priority placeholder pods at the configured node-headroom % (leader-only) + reconcileHeadroom func() // maintains low-priority placeholder pods at the configured headroom target (constant HeadroomNodes or legacy percent; leader-only) hotIdleFloor func(configstore.WorkerSnapshot) int } @@ -164,26 +164,36 @@ func (j *ControlPlaneJanitor) runOnce() { // hotIdleTTL is the fallback for default/warm/legacy workers (ttl=0). expired, err := j.store.ListExpiredHotIdleSnapshots(j.now(), j.hotIdleTTL) if err != nil { + // Do NOT stamp the last-successful-reap gauge on a list failure: + // it backs the wedged-reaper alert, and a config-store outage (the + // moment reaping is actually dark) would otherwise keep it fresh + // and silence the alert. The per-CP reaper returns early here too. slog.Warn("Janitor failed to list expired hot-idle workers.", "error", err) - } - for _, snap := range expired { - if j.hotIdleFloor != nil { - floor := j.hotIdleFloor(snap) - if floor > 0 { - count, err := j.store.CountHotIdleWorkers(snap.OrgID(), snap.Image(), snap.ProfileCPU(), snap.ProfileMemory()) - if err != nil { - slog.Warn("Janitor failed to count compatible hot-idle workers.", "worker", snap.WorkerID(), "worker_pod", snap.PodName(), "org", snap.OrgID(), "error", err) - continue - } - if count <= floor { - slog.Debug("Janitor skipping hot-idle TTL because worker is protected by floor.", "worker", snap.WorkerID(), "worker_pod", snap.PodName(), "org", snap.OrgID(), "floor", floor, "compatible_hot_idle", count) - continue + } else { + for _, snap := range expired { + if j.hotIdleFloor != nil { + floor := j.hotIdleFloor(snap) + if floor > 0 { + count, err := j.store.CountHotIdleWorkers(snap.OrgID(), snap.Image(), snap.ProfileCPU(), snap.ProfileMemory()) + if err != nil { + slog.Warn("Janitor failed to count compatible hot-idle workers.", "worker", snap.WorkerID(), "worker_pod", snap.PodName(), "org", snap.OrgID(), "error", err) + continue + } + if count <= floor { + slog.Debug("Janitor skipping hot-idle TTL because worker is protected by floor.", "worker", snap.WorkerID(), "worker_pod", snap.PodName(), "org", snap.OrgID(), "floor", floor, "compatible_hot_idle", count) + continue + } } } + if _, err := j.lifecycle.RetireFromSnapshot(snap, configstore.WorkerStateRetired, "hot_idle_ttl_expired", LifecycleOriginJanitorHotIdleTTL); err != nil { + slog.Warn("Janitor failed to retire hot-idle worker.", "worker", snap.WorkerID(), "worker_pod", snap.PodName(), "org", snap.OrgID(), "error", err) + } } - if _, err := j.lifecycle.RetireFromSnapshot(snap, configstore.WorkerStateRetired, "hot_idle_ttl_expired", LifecycleOriginJanitorHotIdleTTL); err != nil { - slog.Warn("Janitor failed to retire hot-idle worker.", "worker", snap.WorkerID(), "worker_pod", snap.PodName(), "org", snap.OrgID(), "error", err) - } + // Stamp the last-successful-reap gauge only after a successful + // list pass so an alert can detect a wedged/absent reaper. The + // per-CP fallback reaper stamps the same gauge, so max() across + // replicas tracks "some replica reaped recently". + observeHotIdleReapRun(j.now()) } } } diff --git a/controlplane/janitor_leader_k8s.go b/controlplane/janitor_leader_k8s.go index 61d0a956..793e7307 100644 --- a/controlplane/janitor_leader_k8s.go +++ b/controlplane/janitor_leader_k8s.go @@ -24,11 +24,18 @@ type leaderElectorRunner interface { Run(context.Context) } +// defaultElectorRecontendBackoff is the pause between losing the janitor lease +// and re-contending for it. Short enough that a lost leader rejoins the +// election promptly (so reaping is never long without a leader), long enough +// that a persistently failing Run() doesn't hot-loop the API server. +const defaultElectorRecontendBackoff = 2 * time.Second + type JanitorLeaderManager struct { - elector leaderElectorRunner - leaderLoop *leaderOnlyLoop - mu sync.Mutex - cancel context.CancelFunc + elector leaderElectorRunner + leaderLoop *leaderOnlyLoop + recontendBackoff time.Duration + mu sync.Mutex + cancel context.CancelFunc } func NewJanitorLeaderManager(namespace, identity string, janitor *ControlPlaneJanitor) (*JanitorLeaderManager, error) { @@ -91,8 +98,12 @@ func newJanitorLeaderManagerFromClients( ReleaseOnCancel: true, Name: "duckgres-janitor", Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: leaderLoop.onStartedLeading, + OnStartedLeading: func(ctx context.Context) { + setJanitorIsLeader(true) + leaderLoop.onStartedLeading(ctx) + }, OnStoppedLeading: func() { + setJanitorIsLeader(false) leaderLoop.onStoppedLeading() slog.Info("Lost janitor leadership.") }, @@ -106,8 +117,9 @@ func newJanitorLeaderManagerFromClients( } return &JanitorLeaderManager{ - elector: elector, - leaderLoop: leaderLoop, + elector: elector, + leaderLoop: leaderLoop, + recontendBackoff: defaultElectorRecontendBackoff, }, nil } @@ -121,8 +133,40 @@ func (m *JanitorLeaderManager) Start(ctx context.Context) error { m.cancel() } m.cancel = cancel + backoff := m.recontendBackoff m.mu.Unlock() - go m.elector.Run(runCtx) + if backoff <= 0 { + backoff = defaultElectorRecontendBackoff + } + + // client-go's LeaderElector.Run returns as soon as this CP loses (or fails + // to renew) the lease — it does NOT re-contend on its own. Running it only + // once (the prior behavior) meant a single transient renewal failure + // (API-server blip > RenewDeadline, GC pause, network hiccup) permanently + // dropped this replica out of the election. Because every fleet-wide + // reclamation pass (hot-idle TTL reap, orphan/stuck sweeps, stranded-pod + // cleanup, headroom scale-down) is leader-gated, enough leadership churn + // would leave the lease stale with no contender and the reaper dark + // fleet-wide. Re-contend in a loop so a lost leader always rejoins; the + // elector is safe to Run again after it returns. The loop exits only when + // runCtx is cancelled (Stop or parent shutdown). + go func() { + for { + m.elector.Run(runCtx) + // Run returned: either runCtx was cancelled (Stop/shutdown → exit) + // or we lost the lease. Re-contend after a short backoff. Checking + // the error before the backoff means a cancelled ctx exits promptly + // without an extra Run call. + if runCtx.Err() != nil { + return + } + select { + case <-runCtx.Done(): + return + case <-time.After(backoff): + } + } + }() return nil } diff --git a/controlplane/janitor_leader_k8s_test.go b/controlplane/janitor_leader_k8s_test.go index d84d72b0..ace8ecb9 100644 --- a/controlplane/janitor_leader_k8s_test.go +++ b/controlplane/janitor_leader_k8s_test.go @@ -4,6 +4,7 @@ package controlplane import ( "context" + "sync" "testing" "time" ) @@ -17,6 +18,31 @@ func (e *captureLeaderElector) Run(ctx context.Context) { close(e.ctxDone) } +// countingLeaderElector models client-go's LeaderElector after a lost lease: +// Run records the call and returns immediately rather than blocking. A correct +// manager must re-invoke Run (re-contend) rather than calling it once. +type countingLeaderElector struct { + mu sync.Mutex + calls int + runCh chan struct{} +} + +func (e *countingLeaderElector) Run(ctx context.Context) { + e.mu.Lock() + e.calls++ + e.mu.Unlock() + select { + case e.runCh <- struct{}{}: + default: + } +} + +func (e *countingLeaderElector) count() int { + e.mu.Lock() + defer e.mu.Unlock() + return e.calls +} + func TestJanitorLeaderManagerStopCancelsLeaderElection(t *testing.T) { elector := &captureLeaderElector{ctxDone: make(chan struct{})} manager := &JanitorLeaderManager{ @@ -35,3 +61,49 @@ func TestJanitorLeaderManagerStopCancelsLeaderElection(t *testing.T) { t.Fatal("expected Stop to cancel leader-election context") } } + +// Regression for the elector-never-restarts bug: when Run returns (lease lost), +// the manager must re-contend by invoking Run again, not stay out of the +// election. Without the re-contend loop Run is called exactly once. +func TestJanitorLeaderManagerReContendsAfterLosingLeadership(t *testing.T) { + elector := &countingLeaderElector{runCh: make(chan struct{}, 100)} + manager := &JanitorLeaderManager{ + elector: elector, + leaderLoop: newLeaderOnlyLoop(func(context.Context) {}), + recontendBackoff: time.Millisecond, + } + if err := manager.Start(context.Background()); err != nil { + t.Fatalf("Start: %v", err) + } + defer manager.Stop() + + for i := 0; i < 3; i++ { + select { + case <-elector.runCh: + case <-time.After(2 * time.Second): + t.Fatalf("expected re-contention; Run invoked only %d times", elector.count()) + } + } +} + +// After Stop, the re-contend loop must terminate (no further Run invocations). +func TestJanitorLeaderManagerStopHaltsReContendLoop(t *testing.T) { + elector := &countingLeaderElector{runCh: make(chan struct{}, 1000)} + manager := &JanitorLeaderManager{ + elector: elector, + leaderLoop: newLeaderOnlyLoop(func(context.Context) {}), + recontendBackoff: time.Millisecond, + } + if err := manager.Start(context.Background()); err != nil { + t.Fatalf("Start: %v", err) + } + <-elector.runCh // ensure the loop is spinning + manager.Stop() + + time.Sleep(20 * time.Millisecond) // let any in-flight iteration drain + before := elector.count() + time.Sleep(50 * time.Millisecond) + if after := elector.count(); after != before { + t.Fatalf("expected loop to stop after Stop; Run calls grew %d -> %d", before, after) + } +} diff --git a/controlplane/k8s_pool.go b/controlplane/k8s_pool.go index fbe4cc15..186ac79f 100644 --- a/controlplane/k8s_pool.go +++ b/controlplane/k8s_pool.go @@ -75,13 +75,21 @@ type K8sWorkerPool struct { workerTolerationValue string // taint value for NoSchedule toleration workerPriorityClassName string // PriorityClass for worker pods (preempts overprovision pause pods) - // Headroom controller: keep headroomPercent% of worker-nodepool allocatable - // CPU+mem held by low-priority placeholder pods so a worker spawn schedules - // immediately (preempting placeholders) instead of waiting on a fresh node. + // Headroom controller holds preemptible low-priority placeholder pods so a + // worker spawn schedules immediately (preempting a placeholder) instead of + // waiting on a fresh Karpenter node. headroomNodes>0 selects the CONSTANT + // mode (a fixed number of node-sized placeholders, demand-independent — the + // prod default); otherwise headroomPercent selects the legacy + // demand-proportional mode. Both 0 = disabled (placeholders converge to 0). + headroomNodes int headroomPercent int placeholderImage string placeholderPriorityClassName string + // activatingHBInterval overrides the activating-row heartbeat cadence + // (0 = activatingHeartbeatInterval). Set small in tests; not configured in prod. + activatingHBInterval time.Duration + orgID string // org ID for pod labels (multi-tenant mode) workerIDGenerator func() int // shared ID generator across orgs (nil = internal counter) resolveOrgConfig func(string) (*configstore.OrgConfig, error) // resolve org config for per-tenant image reaping @@ -187,6 +195,7 @@ func newK8sWorkerPool(cfg K8sWorkerPoolConfig, clientset kubernetes.Interface) ( workerTolerationValue: cfg.WorkerTolerationValue, workerPriorityClassName: cfg.WorkerPriorityClassName, + headroomNodes: cfg.HeadroomNodes, headroomPercent: cfg.HeadroomPercent, placeholderImage: cfg.PlaceholderImage, placeholderPriorityClassName: cfg.PlaceholderPriorityClassName, diff --git a/controlplane/k8s_pool_acquire.go b/controlplane/k8s_pool_acquire.go index ade11599..b83875a0 100644 --- a/controlplane/k8s_pool_acquire.go +++ b/controlplane/k8s_pool_acquire.go @@ -9,6 +9,7 @@ import ( stderrors "errors" "fmt" "log/slog" + "sync" "time" "github.com/apache/arrow-go/v18/arrow/flight/flightsql" @@ -166,7 +167,13 @@ func (p *K8sWorkerPool) ActivateReservedWorker(ctx context.Context, worker *Mana if err != nil { return err } - _ = p.persistWorkerRecord(activatingRecord) + // A failed activating persist is self-correcting: the hot persist below is a + // full upsert that overwrites the durable state once activation succeeds, and + // a failed activation retires the worker. Log it (it shouldn't happen) but do + // not abort activation on a transient write blip. + if err := p.persistWorkerRecord(activatingRecord); err != nil { + p.logw(worker.ID).Warn("Failed to persist activating worker record.", "error", err) + } activate := p.activateTenantFunc if activate == nil { @@ -184,7 +191,36 @@ func (p *K8sWorkerPool) ActivateReservedWorker(ctx context.Context, worker *Mana } } - if err := activate(ctx, worker, payload); err != nil { + // Heartbeat the activating row while activate() runs — it can include a long, + // in-band DuckLake migration. The leader stuck-activating reaper keys off + // updated_at, and (unlike the pool-local reaper) cannot see the live + // in-memory session; without this, a legitimately-progressing activation that + // outlives activateTimeout (5m) would be CAS-retired mid-session by the + // durable reaper and its pod deleted. The heartbeat ties "fresh" to "this + // activation goroutine is alive and running": a wedged activation returns via + // its ctx deadline (workerSpawnActivateTimeout) and is retired below, and a + // crashed CP's goroutine dies so the row goes stale and IS reaped — so + // genuinely-stuck rows are still cleaned up. The fenced upsert can't resurrect + // a worker another path retired (state/epoch guard). + stopHB := make(chan struct{}) + hbDone := make(chan struct{}) + go func() { + defer close(hbDone) + p.heartbeatActivatingWorker(worker, stopHB) + }() + // stopHeartbeat is idempotent (sync.Once) and deferred as a panic-safe + // backstop AND called explicitly on the normal path. The explicit call + // joins the heartbeat BEFORE the Hot-commit block below, so a heartbeat can + // never clobber the committed Hot row; the defer guarantees the goroutine + + // ticker are not leaked if activate() panics. + var stopOnce sync.Once + stopHeartbeat := func() { stopOnce.Do(func() { close(stopHB); <-hbDone }) } + defer stopHeartbeat() + + activateErr := activate(ctx, worker, payload) + stopHeartbeat() + + if err := activateErr; err != nil { if hadPrevState { p.mu.Lock() _ = worker.SetSharedState(prevState) @@ -208,16 +244,71 @@ func (p *K8sWorkerPool) ActivateReservedWorker(ctx context.Context, worker *Mana p.mu.Unlock() return err } + // Persist the hot row BEFORE committing the in-memory Hot transition. If we + // flipped in-memory to Hot but the durable row stayed at activating (the old + // swallowed-error behavior), the leader stuck-activating reaper would later + // CAS-retire this LIVE, about-to-serve worker and delete its pod mid-session. + // On persist failure leave the worker Activating and return the error: the + // caller (activateWorkerForOrg) retires the worker and surfaces the error to + // the client (no transparent retry), which is wasteful but never strands a + // durable=activating / in-memory=Hot split. workerRecordFor takes the target + // state explicitly, so building it before the in-memory commit is safe. + hotRecord := p.workerRecordFor(worker.ID, worker, worker.OwnerEpoch(), configstore.WorkerStateHot, "", nil) + if err := p.persistWorkerRecord(hotRecord); err != nil { + p.mu.Unlock() + return fmt.Errorf("persist hot worker record: %w", err) + } if setErr := worker.SetSharedState(nextState); setErr != nil { p.mu.Unlock() return setErr } - hotRecord := p.workerRecordFor(worker.ID, worker, worker.OwnerEpoch(), configstore.WorkerStateHot, "", nil) p.mu.Unlock() - _ = p.persistWorkerRecord(hotRecord) return nil } +// activatingHeartbeatInterval is how often a long-running activation refreshes +// its durable activating row so the leader stuck-activating reaper (which keys +// off updated_at) does not treat a progressing activation as wedged. Well below +// the janitor's activateTimeout (5m). +const activatingHeartbeatInterval = 60 * time.Second + +// heartbeatActivatingWorker re-persists the worker's activating row on a timer +// until stop is closed, keeping updated_at fresh while activate() is in flight. +// Stops (and skips) the moment the worker is no longer this CP's and still +// Activating — so once activation commits Hot under p.mu, a racing heartbeat +// sees the new state and can never clobber the Hot row back to activating. +func (p *K8sWorkerPool) heartbeatActivatingWorker(worker *ManagedWorker, stop <-chan struct{}) { + interval := p.activatingHBInterval + if interval <= 0 { + interval = activatingHeartbeatInterval + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-stop: + return + case <-ticker.C: + // Build AND persist under p.mu so this serializes with the Hot + // transition's persist: the lifecycle re-check below means a heartbeat + // that wins the lock after Hot was committed skips instead of writing + // a stale activating row. persistWorkerRecord doesn't take p.mu and + // logs via plain slog, so holding the lock here is deadlock-safe + // (mirrors markWorkerRetiredLocked). + p.mu.Lock() + w, ok := p.workers[worker.ID] + if !ok || w != worker || w.SharedState().NormalizedLifecycle() != WorkerLifecycleActivating { + p.mu.Unlock() + return + } + now := time.Now() + rec := p.workerRecordFor(worker.ID, worker, worker.OwnerEpoch(), configstore.WorkerStateActivating, "", &now) + _ = p.persistWorkerRecord(rec) + p.mu.Unlock() + } + } +} + // ReserveSharedWorker claims a hot-idle worker or spawns one on demand. // Composition of the gateable decision (reserveSharedWorkerDecision) and the slow // completion (completeSharedWorkerReservation); OrgReservedPool calls the two @@ -253,6 +344,11 @@ func (p *K8sWorkerPool) ReserveSharedWorker(ctx context.Context, assignment *Wor type sharedWorkerClaim struct { hotClaimed *configstore.WorkerRecord slotID int + // slot is the durable spawning-slot row (state=spawning) created by + // CreateSpawningWorkerSlot, retained so a failed spawn can CAS it terminal + // on the request thread instead of leaking org+global cap until the leader's + // stale-spawning sweep. nil in runtime-store-less mode (no durable row). + slot *configstore.WorkerRecord } // reserveSharedWorkerDecision is the short, DB-only half of ReserveSharedWorker: @@ -307,7 +403,7 @@ func (p *K8sWorkerPool) reserveSharedWorkerDecision(assignment *WorkerAssignment if slot == nil { return nil, NewWorkerCapacityExhaustedErrorForReason(configstore.WorkerClaimMissReasonOrgCap, DefaultWorkerSpawnRetryAfter) } - return &sharedWorkerClaim{slotID: slot.WorkerID}, nil + return &sharedWorkerClaim{slotID: slot.WorkerID, slot: slot}, nil } // Runtime-store-less mode (unit tests / standalone k8s): no durable pool, @@ -348,6 +444,17 @@ func (p *K8sWorkerPool) completeSharedWorkerReservation(ctx context.Context, cla return nil, true, reserveErr } worker, err = p.spawnReservedWorkerForSlot(ctx, claim.slotID, assignment) + if err != nil && claim.slot != nil { + // The spawning-slot row counts against the org + global worker caps until + // it is reaped. Free it on the request thread (CAS spawning->terminal) + // instead of waiting for the leader-only 10m stale-spawning sweep — + // otherwise a burst of transient spawn failures drives the org to a false + // at-cap state (spurious OrgCap refusals) and the ghost slots survive a + // janitor-leader outage indefinitely. If spawnReservedWorkerForSlot + // already retired the row via its post-spawn failure paths, this CAS + // simply misses (row no longer spawning) and is a harmless no-op. + p.retireClaimedWorker(claim.slot, RetireReasonSpawnFailure, LifecycleOriginSpawnFailure) + } return worker, false, err } diff --git a/controlplane/k8s_pool_lifecycle.go b/controlplane/k8s_pool_lifecycle.go index 154c6dba..1cd66477 100644 --- a/controlplane/k8s_pool_lifecycle.go +++ b/controlplane/k8s_pool_lifecycle.go @@ -146,35 +146,48 @@ func (p *K8sWorkerPool) RetireIfDrainingAndEmpty(id int, origin LifecycleOrigin) // The session decrement always happens regardless of the return value. func (p *K8sWorkerPool) TransitionToHotIdleIfNoSessions(id int) bool { p.mu.Lock() + defer p.mu.Unlock() w, ok := p.workers[id] if !ok { - p.mu.Unlock() return false } if w.activeSessions > 0 { w.activeSessions-- } if w.activeSessions != 0 { - p.mu.Unlock() return false } if w.SharedState().NormalizedLifecycle() != WorkerLifecycleHot { - p.mu.Unlock() return false } nextState, err := w.SharedState().Transition(WorkerLifecycleHotIdle, nil) if err != nil { - p.mu.Unlock() + return false + } + // Persist the hot_idle row BEFORE committing the in-memory transition + // (mirrors markWorkerRetiredLocked). workerRecordFor takes the target state + // explicitly and reads only stable fields (profile, owner, assignment), so + // it is safe to build before the in-memory commit. If the durable write + // fails we must NOT advance to hot_idle: a durable-hot / in-memory-hot_idle + // split hides the worker from BOTH reuse (ClaimHotIdleWorker filters + // state=hot_idle) AND the TTL reaper (same filter) until this CP restarts, + // stranding an idle pod forever. Leaving it Hot keeps it reusable by its org + // (findIdleAssignedWorkerLocked reuses Hot, activeSessions==0 workers) and a + // later release/retire retries the park; the session decrement above already + // happened, so the worker is honestly idle, just not yet durably parked. + hotIdleRecord := p.workerRecordFor(id, w, w.OwnerEpoch(), configstore.WorkerStateHotIdle, "", nil) + if err := p.persistWorkerRecord(hotIdleRecord); err != nil { + observeHotIdlePersistFailure(w.image) + // Log with the worker's own attrs, NOT p.logw(id): logw -> p.Worker(id) + // takes p.mu.RLock and we hold p.mu here (sync.RWMutex is not reentrant, + // so logw would self-deadlock). workerLogAttrs reads only worker fields. + slog.With(workerLogAttrs(w)...).Warn("Failed to persist hot_idle transition; leaving worker hot for retry.", "error", err) return false } if err := w.SetSharedState(nextState); err != nil { - p.mu.Unlock() return false } w.lastUsed = time.Now() - hotIdleRecord := p.workerRecordFor(id, w, w.OwnerEpoch(), configstore.WorkerStateHotIdle, "", nil) - p.mu.Unlock() - _ = p.persistWorkerRecord(hotIdleRecord) return true } @@ -797,6 +810,14 @@ func (p *K8sWorkerPool) reapStuckActivatingWorkers() { continue default: } + // A worker with a live pre-claimed session is NOT stuck — a client is + // blocked on its activation (which can legitimately run long, e.g. an + // in-band DuckLake migration). Retiring it here would delete the pod out + // from under that client. Only reap genuinely idle reserved/activating + // workers; a truly wedged activation with no waiter still gets reaped. + if w.activeSessions > 0 { + continue + } lifecycle := w.SharedState().NormalizedLifecycle() if (lifecycle == WorkerLifecycleReserved || lifecycle == WorkerLifecycleActivating) && !w.reservedAt.IsZero() && now.Sub(w.reservedAt) > timeout { diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index 1eb3b4c1..11e10d67 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -993,6 +993,109 @@ func TestK8sPool_RetireWorkerIfNoSessions_LastSession(t *testing.T) { } } +// Regression for fix #3: a transient persist failure at the hot->hot_idle park +// must leave the worker Hot (still reusable), NOT advance in-memory to hot_idle +// while the durable row stays hot (the invisible-to-both split). The session is +// still decremented. +func TestK8sPoolTransitionToHotIdlePersistFailureStaysHot(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + pool.runtimeStore = &captureRuntimeWorkerStore{upsertErr: errors.New("store blip")} + worker := &ManagedWorker{ID: 5, activeSessions: 1, done: make(chan struct{})} + if err := worker.SetSharedState(SharedWorkerState{ + Lifecycle: WorkerLifecycleHot, + Assignment: &WorkerAssignment{OrgID: "analytics"}, + }); err != nil { + t.Fatalf("SetSharedState: %v", err) + } + pool.workers[worker.ID] = worker + + if pool.TransitionToHotIdleIfNoSessions(worker.ID) { + t.Fatal("expected park to report false on persist failure") + } + if got := worker.SharedState().NormalizedLifecycle(); got != WorkerLifecycleHot { + t.Fatalf("expected worker to stay Hot on persist failure, got %q", got) + } + if worker.activeSessions != 0 { + t.Fatalf("expected session decremented to 0, got %d", worker.activeSessions) + } +} + +// Regression for fix #4: when the durable Hot persist fails during activation, +// ActivateReservedWorker must return an error and NOT advance the worker to Hot +// (leaving a durable=activating / in-memory=Hot split the stuck reaper could +// kill mid-session). +func TestK8sPoolActivateReservedWorkerHotPersistFailureReturnsError(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + pool.runtimeStore = &captureRuntimeWorkerStore{upsertErr: errors.New("store blip")} + worker := &ManagedWorker{ID: 9, done: make(chan struct{})} + if err := worker.SetSharedState(SharedWorkerState{ + Lifecycle: WorkerLifecycleReserved, + Assignment: &WorkerAssignment{OrgID: "analytics"}, + }); err != nil { + t.Fatalf("SetSharedState: %v", err) + } + pool.workers[worker.ID] = worker + pool.activateTenantFunc = func(ctx context.Context, got *ManagedWorker, payload TenantActivationPayload) error { + return nil // activation itself succeeds; only the durable Hot persist fails + } + + err := pool.ActivateReservedWorker(context.Background(), worker, TenantActivationPayload{OrgID: "analytics"}) + if err == nil { + t.Fatal("expected ActivateReservedWorker to return an error when the Hot persist fails") + } + if got := worker.SharedState().NormalizedLifecycle(); got == WorkerLifecycleHot { + t.Fatalf("expected worker NOT advanced to Hot on persist failure, got %q", got) + } +} + +// Regression for (b): a long-running activation must heartbeat its durable +// activating row so the leader stuck-activating reaper (which keys off +// updated_at and can't see the live session) doesn't reap it mid-activation. +func TestK8sPoolActivateReservedWorkerHeartbeatsActivatingRow(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + store := &captureRuntimeWorkerStore{} + pool.runtimeStore = store + pool.activatingHBInterval = 20 * time.Millisecond + worker := &ManagedWorker{ID: 11, done: make(chan struct{})} + if err := worker.SetSharedState(SharedWorkerState{ + Lifecycle: WorkerLifecycleReserved, + Assignment: &WorkerAssignment{OrgID: "analytics"}, + }); err != nil { + t.Fatalf("SetSharedState: %v", err) + } + pool.workers[worker.ID] = worker + pool.activateTenantFunc = func(ctx context.Context, got *ManagedWorker, payload TenantActivationPayload) error { + time.Sleep(150 * time.Millisecond) // long activation; heartbeats should fire + return nil + } + + if err := pool.ActivateReservedWorker(context.Background(), worker, TenantActivationPayload{OrgID: "analytics"}); err != nil { + t.Fatalf("ActivateReservedWorker: %v", err) + } + + activatingUpserts, sawHot := 0, false + for _, r := range store.snapshot() { + if r.WorkerID != 11 { + continue + } + switch r.State { + case configstore.WorkerStateActivating: + activatingUpserts++ + case configstore.WorkerStateHot: + sawHot = true + } + } + if activatingUpserts < 2 { + t.Fatalf("expected >=2 activating upserts (initial + heartbeat), got %d", activatingUpserts) + } + if !sawHot { + t.Fatal("expected a final Hot upsert after activation completed") + } + if got := worker.SharedState().NormalizedLifecycle(); got != WorkerLifecycleHot { + t.Fatalf("expected Hot lifecycle after activation, got %q", got) + } +} + func TestK8sPoolActivateReservedWorkerTransitionsToHot(t *testing.T) { pool, _ := newTestK8sPool(t, 5) worker := &ManagedWorker{ID: 7, done: make(chan struct{})} @@ -1306,6 +1409,53 @@ func TestK8sPoolReserveSharedWorkerDecisionDoesNotReplaceIncompatibleHotIdleOnGl } } +// Regression for the spawning-slot leak: when the pod spawn fails, the durable +// spawning-slot row must be CAS'd terminal on the request thread (freeing the +// org+global cap) rather than left for the leader's 10m stale-spawning sweep. +func TestK8sPoolReserveSharedWorkerFreesSlotOnSpawnFailure(t *testing.T) { + pool, _ := newTestK8sPool(t, 5) + store := &captureRuntimeWorkerStore{ + hotIdleClaimMissReason: configstore.WorkerClaimMissReasonNoIdle, + spawned: &configstore.WorkerRecord{ + WorkerID: 77, + PodName: "duckgres-worker-test-cp-77", + State: configstore.WorkerStateSpawning, + OrgID: "analytics", + OwnerCPInstanceID: pool.cpInstanceID, + }, + } + pool.runtimeStore = store + pool.spawnWorkerFunc = func(ctx context.Context, id int, image string, profile WorkerProfile) error { + return fmt.Errorf("boom: node unschedulable") + } + + worker, err := pool.ReserveSharedWorker(context.Background(), &WorkerAssignment{ + OrgID: "analytics", + MaxWorkers: 5, + Image: "duckgres:v2", + }) + if err == nil { + t.Fatalf("expected spawn failure error, got worker=%#v", worker) + } + + freed := false + for i, id := range store.markTerminalCalledIDs { + if id != 77 { + continue + } + freed = true + if store.markTerminalReasons[i] != RetireReasonSpawnFailure { + t.Fatalf("expected slot freed with reason %q, got %q", RetireReasonSpawnFailure, store.markTerminalReasons[i]) + } + if store.markTerminalStates[i] != configstore.WorkerStateRetired { + t.Fatalf("expected slot freed to retired, got %q", store.markTerminalStates[i]) + } + } + if !freed { + t.Fatalf("expected spawning slot 77 to be freed on spawn failure; MarkWorkerTerminalIfCurrent ids=%v", store.markTerminalCalledIDs) + } +} + func TestK8sPoolClaimSpecificWorkerTakesOverRuntimeWorker(t *testing.T) { pool, _ := newTestK8sPool(t, 5) store := &captureRuntimeWorkerStore{ diff --git a/controlplane/multitenant.go b/controlplane/multitenant.go index d89d9c6d..5891e32a 100644 --- a/controlplane/multitenant.go +++ b/controlplane/multitenant.go @@ -30,10 +30,10 @@ type orgRouterAdapter struct { // effectiveDefaultWorkerTTL resolves the janitor's hot-idle retention: the // operator default TTL (DUCKGRES_K8S_WORKER_DEFAULT_TTL → // K8sConfig.WorkerDefaultTTL) when set, otherwise the single built-in -// defaultWorkerTTL (20m — the same fallback sized-but-no-ttl requests get at +// defaultWorkerTTL (1m — the same fallback sized-but-no-ttl requests get at // profile resolution, so there is exactly ONE default TTL however a worker // came to have no explicit one). The full per-request precedence is: -// client GUC > org default > deployment default TTL > built-in 20m. +// client GUC > org default > deployment default TTL > built-in 1m. func effectiveDefaultWorkerTTL(configured time.Duration) time.Duration { if configured > 0 { return configured @@ -199,6 +199,7 @@ func SetupMultiTenant( WorkerTolerationKey: cfg.K8s.WorkerTolerationKey, WorkerTolerationValue: cfg.K8s.WorkerTolerationValue, WorkerPriorityClassName: cfg.K8s.WorkerPriorityClassName, + HeadroomNodes: cfg.K8s.HeadroomNodes, HeadroomPercent: cfg.K8s.HeadroomPercent, PlaceholderImage: cfg.K8s.PlaceholderImage, PlaceholderPriorityClassName: cfg.K8s.PlaceholderPriorityClassName, @@ -324,16 +325,34 @@ func SetupMultiTenant( } return org.DefaultWorkerMinHotIdle } - // Node-headroom controller: keep low-priority placeholder pods holding the - // configured % of worker-nodepool allocatable free so worker spawns schedule - // immediately. Leader-only (runs on the janitor tick). No-op when disabled. - if cfg.K8s.HeadroomPercent > 0 { - janitor.reconcileHeadroom = func() { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - router.sharedPool.reconcileHeadroom(ctx) - } - } + // Node-headroom controller: keep low-priority placeholder pods as warm, + // preemptible spare capacity so worker spawns schedule immediately. + // Leader-only (runs on the janitor tick). Always wired — reconcileHeadroom + // itself decides the target (constant HeadroomNodes, legacy HeadroomPercent, + // or disabled). It must run even when disabled so a pool that had headroom + // turned off converges its placeholders to zero instead of stranding them. + janitor.reconcileHeadroom = func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + router.sharedPool.reconcileHeadroom(ctx) + } + + // Per-CP fallback hot-idle reaper: runs on EVERY replica, independent of the + // janitor leader lease, retiring this replica's own expired hot-idle workers. + // Backstops the leader-only janitor reaper above so a wedged/absent leader + // can no longer let idle worker pods (and their r6gd nodes) accumulate + // fleet-wide. Reuses the same TTL, floor, lifecycle and store as the janitor; + // the fenced CAS makes concurrent leader/fallback retires safe. + fallbackReaper := &perCPHotIdleReaper{ + store: store, + lifecycle: router.sharedPool.lifecycle, + cpInstanceID: cpInstanceID, + hotIdleTTL: janitor.hotIdleTTL, + hotIdleFloor: janitor.hotIdleFloor, + orphanGrace: janitor.orphanGrace, // same cutoff as the leader orphan sweep + interval: time.Minute, + } + go fallbackReaper.Run(context.Background()) // Scheduler-side activator: a single SharedWorkerActivator instance // that the credential-refresh tick uses to re-broker STS sessions for diff --git a/controlplane/per_cp_hot_idle_reaper.go b/controlplane/per_cp_hot_idle_reaper.go new file mode 100644 index 00000000..d93dcbe3 --- /dev/null +++ b/controlplane/per_cp_hot_idle_reaper.go @@ -0,0 +1,148 @@ +package controlplane + +import ( + "context" + "log/slog" + "time" + + "github.com/posthog/duckgres/controlplane/configstore" +) + +// perCPHotIdleStore is the store surface the per-CP fallback reaper needs. +type perCPHotIdleStore interface { + ListExpiredHotIdleSnapshotsForCP(ownerCPInstanceID string, now time.Time, defaultTTL time.Duration) ([]configstore.WorkerSnapshot, error) + CountHotIdleWorkers(orgID, image, profileCPU, profileMemory string) (int, error) + // ListOrphanedWorkerSnapshots returns workers whose owning CP instance is no + // longer live (expired/missing past the cutoff). Reused here so the per-CP + // reaper can reclaim hot-idle workers orphaned by a rollout/crash without + // waiting on the leader. + ListOrphanedWorkerSnapshots(before time.Time) ([]configstore.WorkerSnapshot, error) +} + +// perCPHotIdleReaper retires THIS replica's own expired hot-idle workers on a +// local timer, INDEPENDENT of the janitor leader lease. It is the backstop for +// the leader-only janitor reaper (janitor.go): if leadership is wedged or +// absent, idle worker pods — and the r6gd nodes behind them — would otherwise +// accumulate fleet-wide until a CP restart. Each replica reaps only workers it +// owns (owner_cp_instance_id == self); orphaned/cross-CP rows remain the leader +// janitor's responsibility. The retire goes through the same fenced CAS +// (WorkerLifecycle.RetireFromSnapshot) as the leader path, so a worker retired +// concurrently by both simply CAS-misses on one side — no double delete. +type perCPHotIdleReaper struct { + store perCPHotIdleStore + lifecycle *WorkerLifecycle + cpInstanceID string + hotIdleTTL time.Duration + // hotIdleFloor mirrors the janitor's floor function so a per-org + // DefaultWorkerMinHotIdle warm reserve is honored here too (the fallback + // must not over-reap a floor the leader would have preserved). Counts are + // global (CountHotIdleWorkers), so the floor holds across replicas. + hotIdleFloor func(configstore.WorkerSnapshot) int + // orphanGrace is how long an owning CP instance must have been gone before + // its hot-idle workers are treated as orphaned and reaped here. 0 disables + // the per-CP orphan pass (self-owned reaping still runs). Matches the leader + // janitor's orphanGrace so timing is identical, just leader-independent. + orphanGrace time.Duration + interval time.Duration + now func() time.Time +} + +func (r *perCPHotIdleReaper) Run(ctx context.Context) { + if r == nil || r.store == nil || r.lifecycle == nil { + return + } + interval := r.interval + if interval <= 0 { + interval = time.Minute + } + r.runOnce() + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + r.runOnce() + } + } +} + +func (r *perCPHotIdleReaper) runOnce() { + now := r.now + if now == nil { + now = time.Now + } + + // Orphan pass: reap hot-idle workers whose OWNING CP instance is no longer + // live. cpInstanceID is freshly minted on every CP start, so a rollout/crash + // leaves the prior process's hot-idle rows owned by a dead instance — owned + // by no live CP, they are invisible to the self-owned pass below and were + // previously reaped ONLY by the leader janitor's orphan sweep. Running it + // here makes hot-idle reclamation genuinely leader-independent (the whole + // point of this reaper) so a rollout concurrent with a leaderless window + // can't accumulate orphaned idle pods. RetireOrphanFromSnapshot re-checks + // owner-still-expired in its fenced CAS, so a worker meanwhile reclaimed by a + // live CP (ClaimHotIdleWorker) is skipped; no floor is applied (a dead + // owner's worker is not a warm reserve), matching the leader sweep. Runs + // independent of hotIdleTTL — like the leader's orphan sweep, which is not + // gated on the hot-idle TTL block — so a (hypothetical) hotIdleTTL<=0 config + // can't disable the orphan backstop. + r.reapOrphanedHotIdle(now()) + + if r.hotIdleTTL <= 0 { + return + } + expired, err := r.store.ListExpiredHotIdleSnapshotsForCP(r.cpInstanceID, now(), r.hotIdleTTL) + if err != nil { + slog.Warn("Per-CP hot-idle reaper failed to list expired workers.", "cp", r.cpInstanceID, "error", err) + return + } + for _, snap := range expired { + if r.hotIdleFloor != nil { + if floor := r.hotIdleFloor(snap); floor > 0 { + count, err := r.store.CountHotIdleWorkers(snap.OrgID(), snap.Image(), snap.ProfileCPU(), snap.ProfileMemory()) + if err != nil { + slog.Warn("Per-CP hot-idle reaper failed to count compatible hot-idle workers.", + "worker", snap.WorkerID(), "org", snap.OrgID(), "error", err) + continue + } + if count <= floor { + continue + } + } + } + if _, err := r.lifecycle.RetireFromSnapshot(snap, configstore.WorkerStateRetired, "hot_idle_ttl_expired", LifecycleOriginPerCPHotIdleTTL); err != nil { + slog.Warn("Per-CP hot-idle reaper failed to retire worker.", + "worker", snap.WorkerID(), "worker_pod", snap.PodName(), "org", snap.OrgID(), "error", err) + } + } + + // Stamp the last-successful-reap gauge after a successful self-owned list + // (reached only if that list did not error): the alert cares that the reap + // path RAN recently, not that it found work. + observeHotIdleReapRun(now()) +} + +func (r *perCPHotIdleReaper) reapOrphanedHotIdle(now time.Time) { + if r.orphanGrace <= 0 { + return + } + orphaned, err := r.store.ListOrphanedWorkerSnapshots(now.Add(-r.orphanGrace)) + if err != nil { + slog.Warn("Per-CP hot-idle reaper failed to list orphaned workers.", "error", err) + return + } + for _, snap := range orphaned { + // Only hot-idle: other orphan states (spawning/reserved/activating/ + // draining) stay the leader orphan sweep's job — they can carry + // in-flight work this reaper must not second-guess. + if snap.State() != configstore.WorkerStateHotIdle { + continue + } + if _, err := r.lifecycle.RetireOrphanFromSnapshot(snap, "orphaned", LifecycleOriginPerCPOrphan); err != nil { + slog.Warn("Per-CP hot-idle reaper failed to retire orphaned worker.", + "worker", snap.WorkerID(), "worker_pod", snap.PodName(), "org", snap.OrgID(), "error", err) + } + } +} diff --git a/controlplane/per_cp_hot_idle_reaper_test.go b/controlplane/per_cp_hot_idle_reaper_test.go new file mode 100644 index 00000000..c0c73673 --- /dev/null +++ b/controlplane/per_cp_hot_idle_reaper_test.go @@ -0,0 +1,158 @@ +package controlplane + +import ( + "testing" + "time" + + "github.com/posthog/duckgres/controlplane/configstore" +) + +type fakePerCPHotIdleStore struct { + expired []configstore.WorkerSnapshot + listErr error + counts map[string]int + listCPID string + orphaned []configstore.WorkerSnapshot +} + +func (s *fakePerCPHotIdleStore) ListExpiredHotIdleSnapshotsForCP(cpID string, now time.Time, ttl time.Duration) ([]configstore.WorkerSnapshot, error) { + s.listCPID = cpID + return s.expired, s.listErr +} + +func (s *fakePerCPHotIdleStore) CountHotIdleWorkers(orgID, image, cpu, mem string) (int, error) { + return s.counts[orgID], nil +} + +func (s *fakePerCPHotIdleStore) ListOrphanedWorkerSnapshots(before time.Time) ([]configstore.WorkerSnapshot, error) { + return s.orphaned, nil +} + +func hotIdleSnap(id int, org string) configstore.WorkerSnapshot { + return configstore.NewWorkerSnapshot(configstore.WorkerRecord{ + WorkerID: id, + State: configstore.WorkerStateHotIdle, + OrgID: org, + Image: "img", + }) +} + +// The fallback reaper retires every expired hot-idle worker this CP owns, +// scoping the list to its own cp instance id, and stamps the last-run gauge. +func TestPerCPHotIdleReaperRetiresExpired(t *testing.T) { + store := &fakePerCPHotIdleStore{expired: []configstore.WorkerSnapshot{hotIdleSnap(1, "a"), hotIdleSnap(2, "b")}} + lcStore := &fakeLifecycleStore{terminalReturn: true} + r := &perCPHotIdleReaper{ + store: store, + lifecycle: NewWorkerLifecycle(lcStore, &fakePhysicalCleanup{}), + cpInstanceID: "cp-self", + hotIdleTTL: time.Minute, + } + + r.runOnce() + + if store.listCPID != "cp-self" { + t.Fatalf("expected list scoped to own cp id, got %q", store.listCPID) + } + if len(lcStore.terminalTransitions) != 2 { + t.Fatalf("expected 2 retires, got %d", len(lcStore.terminalTransitions)) + } + for _, c := range lcStore.terminalTransitions { + if c.target != configstore.WorkerStateRetired { + t.Fatalf("expected retired target, got %q", c.target) + } + if c.reason != "hot_idle_ttl_expired" { + t.Fatalf("expected hot_idle_ttl_expired reason, got %q", c.reason) + } + } +} + +// A per-org DefaultWorkerMinHotIdle floor must be honored: a worker is NOT +// reaped while the org is at/under its floor, so the fallback can't erode a +// warm reserve the leader would have preserved. +func TestPerCPHotIdleReaperHonorsFloor(t *testing.T) { + store := &fakePerCPHotIdleStore{ + expired: []configstore.WorkerSnapshot{hotIdleSnap(1, "protected")}, + counts: map[string]int{"protected": 1}, // exactly at floor + } + lcStore := &fakeLifecycleStore{terminalReturn: true} + r := &perCPHotIdleReaper{ + store: store, + lifecycle: NewWorkerLifecycle(lcStore, &fakePhysicalCleanup{}), + cpInstanceID: "cp-self", + hotIdleTTL: time.Minute, + hotIdleFloor: func(configstore.WorkerSnapshot) int { return 1 }, + } + + r.runOnce() + + if len(lcStore.terminalTransitions) != 0 { + t.Fatalf("expected floor-protected worker NOT to be retired, got %d retires", len(lcStore.terminalTransitions)) + } +} + +// The per-CP reaper also reaps hot-idle workers orphaned by a dead owning CP +// (rollout/crash), but leaves non-hot-idle orphans to the leader sweep. This is +// the leader-independent backstop for the rollout+leaderless-window case. +func TestPerCPHotIdleReaperReapsOrphanedHotIdle(t *testing.T) { + spawningOrphan := configstore.NewWorkerSnapshot(configstore.WorkerRecord{ + WorkerID: 10, State: configstore.WorkerStateSpawning, OrgID: "x", Image: "img", + }) + store := &fakePerCPHotIdleStore{ + orphaned: []configstore.WorkerSnapshot{hotIdleSnap(9, "deadorg"), spawningOrphan}, + } + lcStore := &fakeLifecycleStore{orphanReturn: true} + r := &perCPHotIdleReaper{ + store: store, + lifecycle: NewWorkerLifecycle(lcStore, &fakePhysicalCleanup{}), + cpInstanceID: "cp-self", + hotIdleTTL: time.Minute, + orphanGrace: 30 * time.Second, + } + + r.runOnce() + + if len(lcStore.orphanTransitions) != 1 { + t.Fatalf("expected exactly 1 orphan retire (the hot-idle one), got %d: %+v", len(lcStore.orphanTransitions), lcStore.orphanTransitions) + } + if lcStore.orphanTransitions[0].workerID != 9 { + t.Fatalf("expected hot-idle orphan 9 reaped, got worker %d", lcStore.orphanTransitions[0].workerID) + } +} + +// orphanGrace==0 disables the orphan pass (self-owned reaping still runs). +func TestPerCPHotIdleReaperOrphanPassDisabled(t *testing.T) { + store := &fakePerCPHotIdleStore{orphaned: []configstore.WorkerSnapshot{hotIdleSnap(9, "deadorg")}} + lcStore := &fakeLifecycleStore{orphanReturn: true} + r := &perCPHotIdleReaper{ + store: store, + lifecycle: NewWorkerLifecycle(lcStore, &fakePhysicalCleanup{}), + cpInstanceID: "cp-self", + hotIdleTTL: time.Minute, + orphanGrace: 0, + } + + r.runOnce() + + if len(lcStore.orphanTransitions) != 0 { + t.Fatalf("expected no orphan retires when orphanGrace=0, got %d", len(lcStore.orphanTransitions)) + } +} + +// Disabled (hotIdleTTL<=0) is a no-op — no listing, no retires. +func TestPerCPHotIdleReaperDisabled(t *testing.T) { + store := &fakePerCPHotIdleStore{expired: []configstore.WorkerSnapshot{hotIdleSnap(1, "a")}} + lcStore := &fakeLifecycleStore{terminalReturn: true} + r := &perCPHotIdleReaper{ + store: store, + lifecycle: NewWorkerLifecycle(lcStore, &fakePhysicalCleanup{}), + cpInstanceID: "cp-self", + hotIdleTTL: 0, + } + + r.runOnce() + + if len(lcStore.terminalTransitions) != 0 { + t.Fatalf("expected no retires when disabled, got %d", len(lcStore.terminalTransitions)) + } +} diff --git a/controlplane/worker_lifecycle_metrics.go b/controlplane/worker_lifecycle_metrics.go index d4c4d86e..b8473e38 100644 --- a/controlplane/worker_lifecycle_metrics.go +++ b/controlplane/worker_lifecycle_metrics.go @@ -81,6 +81,21 @@ const ( // pod terminations (eviction, OOM, manual delete, node drain) from // our own health-check decisions. LifecycleOriginInformerCrash LifecycleOrigin = "informer_crash" + // LifecycleOriginPerCPHotIdleTTL marks the per-CP fallback hot-idle reaper + // (per_cp_hot_idle_reaper.go), which runs on EVERY replica independent of the + // janitor leader lease. Distinct from LifecycleOriginJanitorHotIdleTTL (the + // leader-only janitor reaper) so dashboards can tell when the fallback — + // rather than the leader — is doing the reaping, which is the signal that + // leadership is wedged. + LifecycleOriginPerCPHotIdleTTL LifecycleOrigin = "per_cp_hot_idle_ttl" + // LifecycleOriginPerCPOrphan marks the per-CP fallback reaper retiring a + // hot-idle worker whose OWNING CP instance is no longer live (rollout/crash + // orphan). Distinct from LifecycleOriginJanitorOrphan (the leader-only orphan + // sweep) so dashboards can see the fallback doing cross-CP orphan reclamation + // during a leaderless window. Every CP start mints a fresh cpInstanceID, so a + // graceful rollout orphans the prior process's hot-idle rows; without this + // path they would wait on the leader. + LifecycleOriginPerCPOrphan LifecycleOrigin = "per_cp_orphan" // LifecycleOriginPoolStuckActivating marks the pool-local // reapStuckActivatingWorkers loop, which runs every minute on every // CP. Distinct from LifecycleOriginJanitorStuckActivating (which is @@ -160,6 +175,10 @@ const ( RetireReasonIdleTimeout = "idle_timeout" RetireReasonStuckActivating = "stuck_activating" RetireReasonMismatchedVersion = "mismatched_version" + // RetireReasonSpawnFailure marks a spawning-slot row freed on the request + // thread after its pod spawn failed (k8s_pool_acquire.go), so the org+global + // cap is released immediately instead of waiting for the stale-spawning sweep. + RetireReasonSpawnFailure = "spawn_failure" ) // --- Metric definitions --- @@ -201,6 +220,16 @@ var workerHealthChecksCounter = promauto.NewCounterVec(prometheus.CounterOpts{ Help: "Worker RPC health-check probes partitioned by result (pass|fail) and image. Pass-rate complements the mark-lost transitions counter: a worker that's intermittently failing health checks but not yet crossing the consecutive-failure threshold is invisible without this.", }, []string{"result", "image"}) +// hotIdleReapLastRunGauge is updated by both the leader janitor (janitor.go, +// untagged) and the per-CP fallback reaper (per_cp_hot_idle_reaper.go, untagged), +// so it stays in this shared file. The kubernetes-only janitor_is_leader and +// hot_idle_persist_failures metrics live in worker_lifecycle_metrics_k8s.go so +// they are not flagged unused under the default-tag lint. +var hotIdleReapLastRunGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "duckgres_control_plane_hot_idle_reap_last_run_timestamp_seconds", + Help: "Unix timestamp of the most recent successful hot-idle reap pass on THIS replica — either the leader janitor or the per-CP fallback reaper. Alert when max() across replicas falls far behind now(): idle worker pods are not being retired and node capacity is leaking.", +}) + // --- Observation helpers --- // observeLifecycleTransition records one transition attempt on @@ -294,6 +323,13 @@ func observeDrainTotalDuration(d time.Duration) { workerDrainTotalDurationHistogram.Observe(d.Seconds()) } +// observeHotIdleReapRun stamps the last-successful-hot-idle-reap gauge with the +// given time. Called by both the leader janitor and the per-CP fallback reaper +// (both untagged), so it lives here rather than in the kubernetes-tagged file. +func observeHotIdleReapRun(t time.Time) { + hotIdleReapLastRunGauge.Set(float64(t.Unix())) +} + // observeHealthCheck records the result of one health-check probe. // Empty result drops the sample. func observeHealthCheck(result HealthCheckResult, image string) { diff --git a/controlplane/worker_lifecycle_metrics_k8s.go b/controlplane/worker_lifecycle_metrics_k8s.go new file mode 100644 index 00000000..b12b9766 --- /dev/null +++ b/controlplane/worker_lifecycle_metrics_k8s.go @@ -0,0 +1,46 @@ +//go:build kubernetes + +package controlplane + +import ( + "strings" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +// Metrics referenced only from kubernetes-tagged code (the janitor leader +// elector and the hot->hot_idle park). They live in a kubernetes-tagged file so +// the default-tag lint (golangci-lint runs without build tags) does not flag +// these unexported symbols as unused — under the default build neither the +// metric nor its only caller is compiled. + +var janitorIsLeaderGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "duckgres_control_plane_janitor_is_leader", + Help: "1 if this control-plane replica currently holds the janitor leader lease, else 0. Summed across replicas it should be ~1; a sustained 0 means no replica is running the fleet-wide reapers (hot-idle TTL, orphan/stuck sweeps, headroom) — alert on it.", +}) + +var workerHotIdlePersistFailuresCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "duckgres_worker_hot_idle_persist_failures_total", + Help: "Failures to durably persist a worker's hot->hot_idle transition. The worker is held Hot (and stays reusable by its org) rather than advanced to an in-memory-hot_idle / durable-hot split that would be invisible to BOTH reuse and the TTL reaper. A sustained nonzero rate points at runtime-store write problems.", +}, []string{"image"}) + +// setJanitorIsLeader records whether this replica currently holds the janitor +// leader lease (1) or not (0). +func setJanitorIsLeader(isLeader bool) { + if isLeader { + janitorIsLeaderGauge.Set(1) + return + } + janitorIsLeaderGauge.Set(0) +} + +// observeHotIdlePersistFailure increments the hot_idle persist-failure counter. +// Empty image falls back to "unknown". +func observeHotIdlePersistFailure(image string) { + img := strings.TrimSpace(image) + if img == "" { + img = "unknown" + } + workerHotIdlePersistFailuresCounter.WithLabelValues(img).Inc() +} diff --git a/controlplane/worker_pool.go b/controlplane/worker_pool.go index adc8d0ab..54191436 100644 --- a/controlplane/worker_pool.go +++ b/controlplane/worker_pool.go @@ -109,7 +109,8 @@ type K8sWorkerPoolConfig struct { WorkerTolerationKey string // Taint key for worker pod NoSchedule toleration. Empty = no toleration. WorkerTolerationValue string // Taint value for worker pod NoSchedule toleration. WorkerPriorityClassName string // PriorityClass for worker pods (so they preempt overprovision pause pods). Empty = none. - HeadroomPercent int // Keep this % of worker-nodepool allocatable CPU+mem free via low-priority placeholder pods (0 = disabled). + HeadroomNodes int // CONSTANT node-headroom: keep this many node-sized preemptible placeholder pods, independent of demand. >0 takes precedence over HeadroomPercent. + HeadroomPercent int // Legacy demand-proportional headroom: keep this % of worker demand free via placeholder pods. Used only when HeadroomNodes==0. 0 = disabled. PlaceholderImage string // Image for headroom placeholder pods (a pause image). PlaceholderPriorityClassName string // PriorityClass for placeholder pods — MUST be below WorkerPriorityClassName so workers preempt them. OrgID string // Org ID for pod labels (multi-tenant mode) diff --git a/controlplane/worker_profile.go b/controlplane/worker_profile.go index 0d0b239b..06bd3c82 100644 --- a/controlplane/worker_profile.go +++ b/controlplane/worker_profile.go @@ -20,12 +20,15 @@ const ( // and the deployment did not configure its own default size/TTL. // defaultWorkerTTL is the SINGLE built-in TTL: it backs both sized-but-no-ttl // requests here and default-shape workers in the janitor -// (effectiveDefaultWorkerTTL), so "no TTL anywhere" means 20m regardless of -// how the worker was requested. +// (effectiveDefaultWorkerTTL), so "no TTL anywhere" means 1m regardless of +// how the worker was requested. Kept short so an idle one-session worker pod +// (and the r6gd node behind it) is reclaimed quickly by default; deployments +// that want longer warm reuse set DUCKGRES_K8S_WORKER_DEFAULT_TTL, and a +// specific connection/org can opt up via duckgres.worker_ttl / default_worker_ttl. const ( defaultWorkerCPU = "8" defaultWorkerMemory = "16Gi" - defaultWorkerTTL = 20 * time.Minute + defaultWorkerTTL = 1 * time.Minute ) // orgWorkerProfileDefaults carries an org's operator-set default worker @@ -53,7 +56,7 @@ type orgWorkerProfileDefaults struct { // ignored with a warning — a bad config-store row must not break // connections. // 3. pool-global WorkerCPURequest/MemoryRequest (else built-in 8/16Gi); TTL -// defaults to the deployment WorkerDefaultTTL (else built-in 20m). +// defaults to the deployment WorkerDefaultTTL (else built-in 1m). // // Returns: // - (nil, warns, false, nil) => the DEFAULT profile: no client sizing and no @@ -127,7 +130,7 @@ func resolveWorkerProfileWithConfig(k K8sConfig, opts map[string]string, org org mem := firstNonEmpty(orgMem, firstNonEmpty(strings.TrimSpace(k.WorkerMemoryRequest), defaultWorkerMemory)) // TTL base layer mirrors cpu/mem: org default, else the deployment default // TTL (DUCKGRES_K8S_WORKER_DEFAULT_TTL — the same knob that governs - // default-shape workers via the janitor), else the built-in 20m. + // default-shape workers via the janitor), else the built-in 1m. ttl := defaultWorkerTTL if k.WorkerDefaultTTL > 0 { ttl = k.WorkerDefaultTTL diff --git a/docs/design/worker-ttl-pool.md b/docs/design/worker-ttl-pool.md index dcac34c2..86dba527 100644 --- a/docs/design/worker-ttl-pool.md +++ b/docs/design/worker-ttl-pool.md @@ -45,7 +45,7 @@ options=-c duckgres.worker_cpu=8 -c duckgres.worker_memory=16Gi -c duckgres.work - `duckgres.worker_cpu` — integer cores. Default **8**. - `duckgres.worker_memory` — k8s quantity. Default **16Gi**. -- `duckgres.worker_ttl` — Go duration. Default **20m**. +- `duckgres.worker_ttl` — Go duration. Default **1m**. - Absent GUCs → defaults. Gated behind `AllowClientWorkerSizing`; sizes clamped to `[min,max]` and ttl to `[0,maxTTL]` per deployment (out-of-range → clamp + warn). Gate off → every request uses the defaults. @@ -57,7 +57,9 @@ explicit one): 1. client GUC `duckgres.worker_ttl` (gated, clamped to `WORKER_MAX_TTL`) 2. org default `default_worker_ttl` (admin API `PUT /orgs/:id`, #742) 3. deployment default `DUCKGRES_K8S_WORKER_DEFAULT_TTL` -4. built-in **20m** +4. built-in **1m** (kept short so idle one-session worker pods + their nodes are + reclaimed quickly by default; raise the deployment/org default for tenants + that want warm reuse, or pin a floor via `default_worker_min_hot_idle`) `duckgres.colocate` / `worker_tier` are removed (unknown GUCs are ignored, so old clients that still send them degrade to defaults rather than erroring). @@ -156,8 +158,13 @@ only) it: disrupted; pinning is bounded by query lifetime, not the worker TTL (which would stall drift rollouts). -Config: `DUCKGRES_K8S_HEADROOM_PERCENT` (0 = disabled), placeholder pod size, -the two PriorityClass names. Manifests add the PriorityClasses. +Config: `DUCKGRES_K8S_HEADROOM_NODES` (preferred — a CONSTANT number of +node-sized placeholders, demand-independent, the prod default) or the legacy +`DUCKGRES_K8S_HEADROOM_PERCENT` (demand-proportional, used only when +HEADROOM_NODES is 0); 0 on both = disabled (existing placeholders are deleted). +Plus the two PriorityClass names. Manifests add the PriorityClasses. The +constant mode exists because the percent mode counted hot_idle (idle-but-alive) +workers as demand and so amplified any idle-worker leak into extra placeholders. ## Config knobs (env-only K8s, per existing convention) @@ -167,9 +174,10 @@ options), `DUCKGRES_K8S_WORKER_PROFILE_MIN_CPU`/`_MAX_CPU`/`_MIN_MEMORY`/ `_MAX_MEMORY` (clamps), `DUCKGRES_K8S_WORKER_MAX_TTL` (clamp ceiling) and `DUCKGRES_K8S_WORKER_DEFAULT_TTL` (the default for requests that specify no ttl — see the TTL resolution chain above), -`DUCKGRES_K8S_HEADROOM_PERCENT`, `DUCKGRES_K8S_PLACEHOLDER_IMAGE`/`_PRIORITY_CLASS` -(`_PLACEHOLDER_CPU`/`_MEMORY` are removed — placeholders take the default -worker shape). +`DUCKGRES_K8S_HEADROOM_NODES` (constant node-headroom; preferred) / +`DUCKGRES_K8S_HEADROOM_PERCENT` (legacy demand-%), `DUCKGRES_K8S_PLACEHOLDER_IMAGE`/`_PRIORITY_CLASS` +(`_PLACEHOLDER_CPU`/`_MEMORY` are removed — percent-mode placeholders take the +default worker shape; constant-mode placeholders are node-sized). Removed: all `DUCKGRES_K8S_*COLOCATED*`, `*WORKER_TIERS*`, `*ALLOW_CLIENT_EXCLUSIVE_NODE*`, `*SHARED_WARM_TARGET*`, diff --git a/docs/runbooks/replenish-capacity.md b/docs/runbooks/replenish-capacity.md index b61aea52..4772c5a5 100644 --- a/docs/runbooks/replenish-capacity.md +++ b/docs/runbooks/replenish-capacity.md @@ -17,10 +17,10 @@ There is **no warm pool**. In the remote/k8s backend a worker pod is spawned `duckgres.worker_cpu`/`worker_memory` request, and kept **hot-idle** after the session ends so the same org can reuse it (by exact shape) until its `duckgres.worker_ttl` expires. Startup latency is hidden by the **node-headroom -controller** (`reconcileHeadroom`, a janitor hook): it keeps -`DUCKGRES_K8S_HEADROOM_PERCENT` of the worker nodepool's allocatable CPU+memory -held by low-priority placeholder ("pause") pods; a real worker spawn preempts a -placeholder and schedules immediately. So "low capacity" now means one of: spawns +controller** (`reconcileHeadroom`, a janitor hook): it keeps a constant +`DUCKGRES_K8S_HEADROOM_NODES` node-sized low-priority placeholder ("pause") pods +(or, in the legacy mode, `DUCKGRES_K8S_HEADROOM_PERCENT` of worker demand) ready; +a real worker spawn preempts a placeholder and schedules immediately. So "low capacity" now means one of: spawns are failing, the cluster can't schedule pods, headroom is exhausted, or an org / the global pool has hit its worker cap. @@ -76,9 +76,10 @@ unassigned anymore); production capacity lives in `binding="org_bound"`. ``` - Confirm placeholder pods exist and are `Running` (they should be preempted, then rescheduled, as real workers spawn). - - Raise `DUCKGRES_K8S_HEADROOM_PERCENT` (or `placeholderCPU`/`placeholderMemory` - in the chart) to hold more capacity ready. Placeholders use a PriorityClass - ranked **below** the worker PriorityClass so workers always win. + - Raise `DUCKGRES_K8S_HEADROOM_NODES` (constant mode) — or + `DUCKGRES_K8S_HEADROOM_PERCENT` in the legacy mode — to hold more capacity + ready. Placeholders use a PriorityClass ranked **below** the worker + PriorityClass so workers always win. 4. **Check the caps.** If you hit `worker capacity exhausted for organization` or `...by global pool limit`, the request is at a real cap, not a scheduling diff --git a/main.go b/main.go index 34c53fc5..12cdbadb 100644 --- a/main.go +++ b/main.go @@ -376,6 +376,7 @@ func main() { WorkerTolerationValue: resolved.K8sWorkerTolerationValue, AllowClientWorkerProfile: resolved.K8sAllowClientWorkerProfile, WorkerPriorityClassName: resolved.K8sWorkerPriorityClassName, + HeadroomNodes: resolved.K8sHeadroomNodes, HeadroomPercent: resolved.K8sHeadroomPercent, PlaceholderImage: resolved.K8sPlaceholderImage, PlaceholderPriorityClassName: resolved.K8sPlaceholderPriorityClassName, diff --git a/tests/e2e-mw-dev/harness.sh b/tests/e2e-mw-dev/harness.sh index d348184f..89a3be9d 100755 --- a/tests/e2e-mw-dev/harness.sh +++ b/tests/e2e-mw-dev/harness.sh @@ -831,6 +831,38 @@ reuse_sized_worker() { # org password catalog cpu memory ttl log "sized worker reuse OK: 1 pod of cpu=$cpu (reused, no respawn)" } +# Hot-idle TTL RETIREMENT (end-to-end smoke): a worker parked hot-idle past its +# TTL must be reaped (pod deleted), so an idle one-session worker — and the node +# behind it — does not linger. This exercises the real reap path (CP -> store -> +# pod delete) against a single healthy CP holding the lease. NOTE: it is NOT a +# regression gate for the leader-loss / persist-failure / spawn-failure paths — +# those would pass on the pre-fix binary too (one healthy leader already reaps +# within TTL). The regression gates for those live in the unit tests +# (janitor_leader_k8s_test.go re-contend, k8s_pool_test.go persist-failure, +# per_cp_hot_idle_reaper_test.go orphan/floor); killing the leader or injecting a +# store fault in-Job is not deterministic here. Uses worker_ttl=1m (the smallest +# non-truncated TTL) and a shape no other test uses (3 CPU) so the pod is ours. +hot_idle_retired() { # org password catalog cpu memory + org="$1"; pw="$2"; cat="$3"; cpu="$4"; mem="$5"; ttl=1m + log "hot-idle TTL retirement on $org/$cat: cpu=$cpu mem=$mem ttl=$ttl (idle worker must be reaped)" + ( export PGOPTIONS="-c duckgres.worker_cpu=$cpu -c duckgres.worker_memory=$mem -c duckgres.worker_ttl=$ttl" + _pg_exec "$org" "$pw" "$cat" 'SELECT 1' ) >/dev/null \ + || fail "hot-idle retire: query failed on $org/$cat" + n="$(count_org_workers_of_cpu "$org" "$cpu")" + [ "$n" -ge 1 ] || fail "hot-idle retire: expected >=1 pod of cpu=$cpu after connect, got $n" + # Session has ended → worker parks hot-idle → its 1m TTL expires → reaper must + # delete the pod. Poll (fast-exit) up to ~4m: 1m TTL + per-CP reaper tick (1m) + + # cluster slack. A pod that never disappears is the leak regression. + i=0 + while [ "$i" -lt 16 ]; do + sleep 15 + [ "$(count_org_workers_of_cpu "$org" "$cpu")" = "0" ] && { + log "hot-idle retire OK: cpu=$cpu worker reaped after TTL"; return; } + i=$((i + 1)) + done + fail "hot-idle retire: cpu=$cpu worker still present after ~4m (TTL=1m) — hot-idle reaper not retiring idle workers (the idle-leak regression)" +} + # ---- org default worker profile -------------------------------------------- # Operators can give a tenant a server-side default worker shape + hot-idle TTL # (config-store columns default_worker_cpu/memory/ttl, set via the admin API). @@ -1407,6 +1439,9 @@ lane_cnpg() { # full wire/catalog/concurrency/sizing coverage on the cnpg org reuse_sized_worker "$CNPG" "$cnpg_pw" ducklake 2 4Gi 15m sized_worker "$CNPG" "$cnpg_pw" iceberg 1 2Gi 15m reuse_sized_worker "$CNPG" "$cnpg_pw" iceberg 1 2Gi 15m + # Idle worker reclamation: a 3-CPU worker with a 1m TTL must be reaped after it + # goes idle (catches the hot-idle-reaper-dark / persist-swallow idle leak). + hot_idle_retired "$CNPG" "$cnpg_pw" ducklake 3 6Gi } # Busy-only Karpenter disruption protection: a worker pod must carry