diff --git a/controlplane/k8s_pool_helpers.go b/controlplane/k8s_pool_helpers.go index e3e11907..3ff841ea 100644 --- a/controlplane/k8s_pool_helpers.go +++ b/controlplane/k8s_pool_helpers.go @@ -11,6 +11,7 @@ import ( "github.com/posthog/duckgres/controlplane/configstore" "github.com/posthog/duckgres/server" + corev1 "k8s.io/api/core/v1" ) // --- Helpers --- @@ -212,3 +213,97 @@ func (p *K8sWorkerPool) logw(id int) *slog.Logger { } return slog.With("worker", id) } + +func workerPodStatusLogAttrs(pod *corev1.Pod) []any { + if pod == nil { + return nil + } + attrs := []any{ + "pod_phase", string(pod.Status.Phase), + } + if pod.Status.Reason != "" { + attrs = append(attrs, "pod_reason", pod.Status.Reason) + } + if pod.Status.Message != "" { + attrs = append(attrs, "pod_message", pod.Status.Message) + } + if pod.Spec.NodeName != "" { + attrs = append(attrs, "pod_node", pod.Spec.NodeName) + } + if pod.Status.PodIP != "" { + attrs = append(attrs, "pod_ip", pod.Status.PodIP) + } + if pod.DeletionTimestamp != nil { + attrs = append(attrs, "pod_deletion_timestamp", pod.DeletionTimestamp.Time.UTC().Format(time.RFC3339Nano)) + } + if status, ok := workerContainerStatus(pod.Status.ContainerStatuses); ok { + attrs = append(attrs, containerStatusLogAttrs(status)...) + } + return attrs +} + +func workerContainerStatus(statuses []corev1.ContainerStatus) (corev1.ContainerStatus, bool) { + if len(statuses) == 0 { + return corev1.ContainerStatus{}, false + } + for _, status := range statuses { + if status.Name == "duckdb-worker" { + return status, true + } + } + return statuses[0], true +} + +func containerStatusLogAttrs(status corev1.ContainerStatus) []any { + attrs := []any{ + "container_name", status.Name, + "container_restart_count", status.RestartCount, + } + switch { + case status.State.Terminated != nil: + term := status.State.Terminated + attrs = append(attrs, + "container_state", "terminated", + "container_reason", term.Reason, + "container_exit_code", term.ExitCode, + "container_signal", term.Signal, + ) + if term.Message != "" { + attrs = append(attrs, "container_message", term.Message) + } + if !term.StartedAt.IsZero() { + attrs = append(attrs, "container_started_at", term.StartedAt.Time.UTC().Format(time.RFC3339Nano)) + } + if !term.FinishedAt.IsZero() { + attrs = append(attrs, "container_finished_at", term.FinishedAt.Time.UTC().Format(time.RFC3339Nano)) + } + case status.State.Waiting != nil: + waiting := status.State.Waiting + attrs = append(attrs, + "container_state", "waiting", + "container_reason", waiting.Reason, + ) + if waiting.Message != "" { + attrs = append(attrs, "container_message", waiting.Message) + } + case status.State.Running != nil: + running := status.State.Running + attrs = append(attrs, "container_state", "running") + if !running.StartedAt.IsZero() { + attrs = append(attrs, "container_started_at", running.StartedAt.Time.UTC().Format(time.RFC3339Nano)) + } + default: + attrs = append(attrs, "container_state", "unknown") + } + if term := status.LastTerminationState.Terminated; term != nil { + attrs = append(attrs, + "last_container_reason", term.Reason, + "last_container_exit_code", term.ExitCode, + "last_container_signal", term.Signal, + ) + if !term.FinishedAt.IsZero() { + attrs = append(attrs, "last_container_finished_at", term.FinishedAt.Time.UTC().Format(time.RFC3339Nano)) + } + } + return attrs +} diff --git a/controlplane/k8s_pool_lifecycle.go b/controlplane/k8s_pool_lifecycle.go index 9221c429..154c6dba 100644 --- a/controlplane/k8s_pool_lifecycle.go +++ b/controlplane/k8s_pool_lifecycle.go @@ -438,15 +438,44 @@ func (p *K8sWorkerPool) HealthCheckLoop(ctx context.Context, interval time.Durat } observeControlPlaneWorkers(workerCount) - p.logw(lease.workerID).Error("K8s worker unresponsive, deleting pod.", "consecutive_failures", count) if onCrash != nil { onCrash(lease.workerID) } - // Delete the pod to force cleanup + + // Snapshot pod/container state before the delete below removes + // the easiest source of OOMKilled/Evicted/exit-code evidence. podName := p.workerPodName(removedWorker) - _ = p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, podName, metav1.DeleteOptions{ - GracePeriodSeconds: int64Ptr(10), - }) + deleteAttrs := []any{"consecutive_failures", count} + if podName != "" && p.clientset != nil { + statusCtx, statusCancel := context.WithTimeout(ctx, 2*time.Second) + pod, err := p.clientset.CoreV1().Pods(p.namespace).Get(statusCtx, podName, metav1.GetOptions{}) + statusCancel() + if err != nil { + deleteAttrs = append(deleteAttrs, "pod_status_error", err) + } else { + deleteAttrs = append(deleteAttrs, workerPodStatusLogAttrs(pod)...) + } + } + logger := slog.With(workerLogAttrs(removedWorker)...) + logger.Error("K8s worker unresponsive, deleting pod.", deleteAttrs...) + deleteResultAttrs := append(append([]any{}, deleteAttrs...), "grace_period_seconds", 10) + if podName == "" || p.clientset == nil { + logger.Warn("K8s worker pod delete skipped.", append(deleteResultAttrs, "reason", "missing_pod_or_client")...) + } else { + deleteCtx, deleteCancel := context.WithTimeout(context.Background(), 10*time.Second) + err := p.clientset.CoreV1().Pods(p.namespace).Delete(deleteCtx, podName, metav1.DeleteOptions{ + GracePeriodSeconds: int64Ptr(10), + }) + deleteCancel() + switch { + case err == nil: + logger.Info("K8s worker pod delete requested.", deleteResultAttrs...) + case errors.IsNotFound(err): + logger.Info("K8s worker pod already gone before delete request completed.", append(deleteResultAttrs, "error", err)...) + default: + logger.Warn("K8s worker pod delete request failed.", append(deleteResultAttrs, "error", err)...) + } + } if removedWorker.client != nil { _ = removedWorker.client.Close() } diff --git a/controlplane/k8s_pool_reconcile.go b/controlplane/k8s_pool_reconcile.go index 0dc56970..c3d66f69 100644 --- a/controlplane/k8s_pool_reconcile.go +++ b/controlplane/k8s_pool_reconcile.go @@ -400,7 +400,14 @@ func (p *K8sWorkerPool) onPodTerminated(pod *corev1.Pod) { case <-w.done: // Already closed default: - slog.Warn("Worker pod terminated.", "worker", id, "worker_pod", pod.Name, "org", pod.Labels["duckgres/active-org"], "phase", pod.Status.Phase) + attrs := []any{ + "worker", id, + "worker_pod", pod.Name, + "org", pod.Labels["duckgres/active-org"], + "phase", pod.Status.Phase, + } + attrs = append(attrs, workerPodStatusLogAttrs(pod)...) + slog.Warn("Worker pod terminated.", attrs...) close(w.done) } } diff --git a/controlplane/k8s_pool_test.go b/controlplane/k8s_pool_test.go index c245922e..1eb3b4c1 100644 --- a/controlplane/k8s_pool_test.go +++ b/controlplane/k8s_pool_test.go @@ -6,10 +6,12 @@ import ( "context" "errors" "fmt" + "log/slog" "regexp" "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -25,6 +27,71 @@ import ( k8stesting "k8s.io/client-go/testing" ) +type capturedSlogRecord struct { + message string + attrs map[string]string +} + +type capturedSlogSink struct { + mu sync.Mutex + records []capturedSlogRecord +} + +func (s *capturedSlogSink) record(message string, attrs []slog.Attr) { + s.mu.Lock() + defer s.mu.Unlock() + record := capturedSlogRecord{message: message, attrs: make(map[string]string, len(attrs))} + for _, attr := range attrs { + record.attrs[attr.Key] = attr.Value.String() + } + s.records = append(s.records, record) +} + +func (s *capturedSlogSink) findMessage(message string) (capturedSlogRecord, bool) { + s.mu.Lock() + defer s.mu.Unlock() + for _, record := range s.records { + if record.message == message { + return record, true + } + } + return capturedSlogRecord{}, false +} + +type capturedSlogHandler struct { + sink *capturedSlogSink + attrs []slog.Attr +} + +func (h *capturedSlogHandler) Enabled(context.Context, slog.Level) bool { return true } + +func (h *capturedSlogHandler) Handle(_ context.Context, r slog.Record) error { + attrs := append([]slog.Attr{}, h.attrs...) + r.Attrs(func(attr slog.Attr) bool { + attrs = append(attrs, attr) + return true + }) + h.sink.record(r.Message, attrs) + return nil +} + +func (h *capturedSlogHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + next := &capturedSlogHandler{sink: h.sink} + next.attrs = append(append([]slog.Attr{}, h.attrs...), attrs...) + return next +} + +func (h *capturedSlogHandler) WithGroup(string) slog.Handler { return h } + +func captureSlog(t *testing.T) *capturedSlogSink { + t.Helper() + previous := slog.Default() + sink := &capturedSlogSink{} + slog.SetDefault(slog.New(&capturedSlogHandler{sink: sink})) + t.Cleanup(func() { slog.SetDefault(previous) }) + return sink +} + type captureRuntimeWorkerStore struct { mu sync.Mutex records []configstore.WorkerRecord @@ -1691,6 +1758,7 @@ func TestK8sPoolHealthCheckLoopCompletesSameLeaseAlreadyLost(t *testing.T) { func TestK8sPoolHealthCheckLoopCurrentLeaseDeletesPodAndNotifiesCrash(t *testing.T) { pool, cs := newTestK8sPool(t, 5) + logs := captureSlog(t) store := &captureRuntimeWorkerStore{ preloadedRecords: map[int]*configstore.WorkerRecord{ 8: { @@ -1711,15 +1779,38 @@ func TestK8sPoolHealthCheckLoopCurrentLeaseDeletesPodAndNotifiesCrash(t *testing if _, err := cs.CoreV1().Pods("default").Create(context.Background(), &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: "adopted-worker-8", Namespace: "default"}, - Status: corev1.PodStatus{Phase: corev1.PodRunning, PodIP: "10.0.0.8"}, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + PodIP: "10.0.0.8", + ContainerStatuses: []corev1.ContainerStatus{{ + Name: "duckdb-worker", + RestartCount: 2, + State: corev1.ContainerState{ + Running: &corev1.ContainerStateRunning{StartedAt: metav1.NewTime(time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC))}, + }, + }}, + }, }, metav1.CreateOptions{}); err != nil { t.Fatalf("create worker pod: %v", err) } crashed := make(chan int, 1) + crashNotified := make(chan struct{}) + var crashNotifiedOnce sync.Once + var snapshotBeforeCrash atomic.Bool + cs.Fake.PrependReactor("get", "pods", func(k8stesting.Action) (bool, runtime.Object, error) { + select { + case <-crashNotified: + default: + snapshotBeforeCrash.Store(true) + } + return false, nil, nil + }) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() go pool.HealthCheckLoop(ctx, time.Millisecond, func(workerID int) { + crashNotifiedOnce.Do(func() { close(crashNotified) }) crashed <- workerID }, nil) @@ -1746,6 +1837,32 @@ func TestK8sPoolHealthCheckLoopCurrentLeaseDeletesPodAndNotifiesCrash(t *testing if got := podDeleteActionNames(cs); len(got) != 1 || got[0] != "adopted-worker-8" { t.Fatalf("expected health-check failure to delete adopted pod, got %v", got) } + if snapshotBeforeCrash.Load() { + t.Fatal("pod status snapshot must not run before crash notification") + } + deleteLog, ok := logs.findMessage("K8s worker unresponsive, deleting pod.") + if !ok { + t.Fatal("expected health-check pod delete log") + } + if deleteLog.attrs["worker_pod"] != "adopted-worker-8" { + t.Fatalf("expected worker_pod attr on delete log, got attrs %#v", deleteLog.attrs) + } + if deleteLog.attrs["pod_phase"] != "Running" { + t.Fatalf("expected pod_phase attr on delete log, got attrs %#v", deleteLog.attrs) + } + if deleteLog.attrs["container_name"] != "duckdb-worker" || deleteLog.attrs["container_state"] != "running" { + t.Fatalf("expected container summary attrs on delete log, got attrs %#v", deleteLog.attrs) + } + if deleteLog.attrs["container_restart_count"] != "2" { + t.Fatalf("expected container restart count attr on delete log, got attrs %#v", deleteLog.attrs) + } + deleteRequestedLog, ok := logs.findMessage("K8s worker pod delete requested.") + if !ok { + t.Fatal("expected post-delete request log") + } + if deleteRequestedLog.attrs["worker_pod"] != "adopted-worker-8" { + t.Fatalf("expected worker_pod attr on post-delete log, got attrs %#v", deleteRequestedLog.attrs) + } store.mu.Lock() markLostCalls := store.markLostCalls @@ -2910,17 +3027,38 @@ func TestK8sPoolRetireWorkerUsesTrackedPodName(t *testing.T) { func TestK8sPool_OnPodTerminated(t *testing.T) { pool, _ := newTestK8sPool(t, 5) + logs := captureSlog(t) done := make(chan struct{}) pool.workers[5] = &ManagedWorker{ID: 5, done: done} pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ + Name: "worker-5", Labels: map[string]string{ - "duckgres/worker-id": "5", + "duckgres/worker-id": "5", + "duckgres/active-org": "org-a", }, }, - Status: corev1.PodStatus{Phase: corev1.PodFailed}, + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + Reason: "Evicted", + Message: "pod evicted by kubelet", + ContainerStatuses: []corev1.ContainerStatus{{ + Name: "duckdb-worker", + RestartCount: 1, + State: corev1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 137, + Signal: 9, + Reason: "OOMKilled", + Message: "container exceeded memory limit", + StartedAt: metav1.NewTime(time.Date(2026, 1, 2, 3, 4, 5, 0, time.UTC)), + FinishedAt: metav1.NewTime(time.Date(2026, 1, 2, 3, 14, 5, 0, time.UTC)), + }, + }, + }}, + }, } pool.onPodTerminated(pod) @@ -2932,6 +3070,24 @@ func TestK8sPool_OnPodTerminated(t *testing.T) { default: t.Fatal("done channel should be closed after pod termination") } + terminatedLog, ok := logs.findMessage("Worker pod terminated.") + if !ok { + t.Fatal("expected worker pod terminated log") + } + if terminatedLog.attrs["worker_pod"] != "worker-5" || terminatedLog.attrs["org"] != "org-a" { + t.Fatalf("expected worker identity attrs on terminated log, got attrs %#v", terminatedLog.attrs) + } + if terminatedLog.attrs["pod_phase"] != "Failed" || terminatedLog.attrs["pod_reason"] != "Evicted" { + t.Fatalf("expected pod status attrs on terminated log, got attrs %#v", terminatedLog.attrs) + } + if terminatedLog.attrs["container_name"] != "duckdb-worker" || + terminatedLog.attrs["container_state"] != "terminated" || + terminatedLog.attrs["container_reason"] != "OOMKilled" || + terminatedLog.attrs["container_exit_code"] != "137" || + terminatedLog.attrs["container_signal"] != "9" || + terminatedLog.attrs["container_restart_count"] != "1" { + t.Fatalf("expected container termination attrs on terminated log, got attrs %#v", terminatedLog.attrs) + } } func TestK8sPool_IdleReaper(t *testing.T) {