Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions controlplane/provisioner/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,21 +233,26 @@ func (c *Controller) reconcileProvisioning(ctx context.Context, w *configstore.M
return
}

status, err := c.duckling.Get(ctx, w.OrgID)
provisioningStatus, err := c.duckling.GetProvisioningStatus(ctx, w.OrgID)
if err != nil {
log.Warn("Failed to get Duckling CR status.", "error", err)
return
}
status := provisioningStatus.DucklingStatus

// Check for Crossplane failure — only fail on persistent sync errors.
// Crossplane resources commonly flap Synced=False transiently (e.g., IAM
// eventual consistency, metadata-store endpoint DNS propagation), so we only transition
// to failed if 10+ minutes have passed, giving transient errors time to resolve.
if status.SyncedFalseMessage != "" && time.Since(startedAt) > 10*time.Minute {
log.Warn("Crossplane sync failure.", "message", status.SyncedFalseMessage)
if (status.SyncedFalseMessage != "" || provisioningStatus.DataStoreReadiness.Message != "") && time.Since(startedAt) > 10*time.Minute {
message := status.SyncedFalseMessage
if message == "" {
message = provisioningStatus.DataStoreReadiness.Message
}
log.Warn("Crossplane sync failure.", "message", message)
_ = c.store.UpdateWarehouseState(w.OrgID, configstore.ManagedWarehouseStateProvisioning, map[string]interface{}{
"state": configstore.ManagedWarehouseStateFailed,
"status_message": fmt.Sprintf("Crossplane error: %s", status.SyncedFalseMessage),
"status_message": fmt.Sprintf("Crossplane error: %s", message),
"failed_at": time.Now().UTC(),
})
return
Expand All @@ -259,7 +264,7 @@ func (c *Controller) reconcileProvisioning(ctx context.Context, w *configstore.M
// state transitions are stored in the config store.
updates := map[string]interface{}{}

if status.DataStore.BucketName != "" && w.S3State != configstore.ManagedWarehouseStateReady {
if provisioningStatus.DataStoreReadiness.Ready && w.S3State != configstore.ManagedWarehouseStateReady {
updates["s3_state"] = configstore.ManagedWarehouseStateReady
}

Expand Down Expand Up @@ -488,14 +493,10 @@ func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.Manag
}

log.Info("Deleting Duckling CR.")
if err := c.duckling.Delete(ctx, w.OrgID); err != nil {
// Only proceed if the CR is already gone (NotFound). For other errors
// (network, RBAC, etc.) we retry on the next reconcile pass to avoid
// marking as deleted while AWS resources still exist.
if !apierrors.IsNotFound(err) {
log.Warn("Failed to delete Duckling CR, will retry.", "error", err)
return
}
gone, err := c.duckling.EnsureDeleted(ctx, w.OrgID)
if err != nil {
log.Warn("Failed to delete Duckling CR, will retry.", "error", err)
return
}

// Tear down the per-org Lakekeeper instance the control plane provisioned
Expand All @@ -515,6 +516,11 @@ func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.Manag
}
}

if !gone {
log.Info("Duckling CR delete requested; waiting for Kubernetes finalizers to finish.")
return
}

if err := c.store.UpdateWarehouseState(w.OrgID, configstore.ManagedWarehouseStateDeleting, map[string]interface{}{
"state": configstore.ManagedWarehouseStateDeleted,
"status_message": "Resources deleted",
Expand Down
12 changes: 6 additions & 6 deletions controlplane/provisioner/controller_lakekeeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ func TestReconcileLakekeeper_DriftCorrectsWhenAlreadyProvisioned(t *testing.T) {
k8sClient, dyn, _ := newFakeLakekeeperClient()
p := NewLakekeeperProvisioner(newFakeStore(), k8sClient)

// Seed a legacy-named CR carrying the org label. Its name intentionally
// differs from LakekeeperResourceName("acme") to prove the patch is matched
// by label, not by a recomputed name (the post-#632 hyphenation bug).
// Seed an existing CR carrying the org label. Its name intentionally differs
// from LakekeeperResourceName("acme") to prove the patch is matched by label,
// not by a recomputed name.
seed := &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "lakekeeper.k8s.lakekeeper.io/v1alpha1",
"kind": "Lakekeeper",
"metadata": map[string]interface{}{
"name": "lakekeeper-acme-legacy",
"name": "lakekeeper-acme-existing",
"namespace": k8sClient.namespace,
"labels": map[string]interface{}{"duckgres/active-org": "acme"},
},
Expand Down Expand Up @@ -156,8 +156,8 @@ func TestReconcileLakekeeper_DriftCorrectsWhenAlreadyProvisioned(t *testing.T) {
if called {
t.Errorf("inputs resolver should NOT be called for pod-shape drift correction")
}
// The existing legacy-named CR was patched in place by label.
got, err := dyn.Resource(lakekeeperGVR).Namespace(k8sClient.namespace).Get(context.Background(), "lakekeeper-acme-legacy", metav1.GetOptions{})
// The existing differently named CR was patched in place by label.
got, err := dyn.Resource(lakekeeperGVR).Namespace(k8sClient.namespace).Get(context.Background(), "lakekeeper-acme-existing", metav1.GetOptions{})
if err != nil {
t.Fatalf("get patched CR: %v", err)
}
Expand Down
Loading
Loading