diff --git a/controlplane/provisioner/controller.go b/controlplane/provisioner/controller.go index ca6047d9..811b3183 100644 --- a/controlplane/provisioner/controller.go +++ b/controlplane/provisioner/controller.go @@ -233,21 +233,26 @@ func (c *Controller) reconcileProvisioning(ctx context.Context, w *configstore.M return } - status, err := c.duckling.Get(ctx, w.OrgID) + provisioningStatus, err := c.duckling.GetProvisioningStatus(ctx, w.OrgID) if err != nil { log.Warn("Failed to get Duckling CR status.", "error", err) return } + status := provisioningStatus.DucklingStatus // Check for Crossplane failure — only fail on persistent sync errors. // Crossplane resources commonly flap Synced=False transiently (e.g., IAM // eventual consistency, metadata-store endpoint DNS propagation), so we only transition // to failed if 10+ minutes have passed, giving transient errors time to resolve. - if status.SyncedFalseMessage != "" && time.Since(startedAt) > 10*time.Minute { - log.Warn("Crossplane sync failure.", "message", status.SyncedFalseMessage) + if (status.SyncedFalseMessage != "" || provisioningStatus.DataStoreReadiness.Message != "") && time.Since(startedAt) > 10*time.Minute { + message := status.SyncedFalseMessage + if message == "" { + message = provisioningStatus.DataStoreReadiness.Message + } + log.Warn("Crossplane sync failure.", "message", message) _ = c.store.UpdateWarehouseState(w.OrgID, configstore.ManagedWarehouseStateProvisioning, map[string]interface{}{ "state": configstore.ManagedWarehouseStateFailed, - "status_message": fmt.Sprintf("Crossplane error: %s", status.SyncedFalseMessage), + "status_message": fmt.Sprintf("Crossplane error: %s", message), "failed_at": time.Now().UTC(), }) return @@ -259,7 +264,7 @@ func (c *Controller) reconcileProvisioning(ctx context.Context, w *configstore.M // state transitions are stored in the config store. updates := map[string]interface{}{} - if status.DataStore.BucketName != "" && w.S3State != configstore.ManagedWarehouseStateReady { + if provisioningStatus.DataStoreReadiness.Ready && w.S3State != configstore.ManagedWarehouseStateReady { updates["s3_state"] = configstore.ManagedWarehouseStateReady } @@ -488,14 +493,10 @@ func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.Manag } log.Info("Deleting Duckling CR.") - if err := c.duckling.Delete(ctx, w.OrgID); err != nil { - // Only proceed if the CR is already gone (NotFound). For other errors - // (network, RBAC, etc.) we retry on the next reconcile pass to avoid - // marking as deleted while AWS resources still exist. - if !apierrors.IsNotFound(err) { - log.Warn("Failed to delete Duckling CR, will retry.", "error", err) - return - } + gone, err := c.duckling.EnsureDeleted(ctx, w.OrgID) + if err != nil { + log.Warn("Failed to delete Duckling CR, will retry.", "error", err) + return } // Tear down the per-org Lakekeeper instance the control plane provisioned @@ -515,6 +516,11 @@ func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.Manag } } + if !gone { + log.Info("Duckling CR delete requested; waiting for Kubernetes finalizers to finish.") + return + } + if err := c.store.UpdateWarehouseState(w.OrgID, configstore.ManagedWarehouseStateDeleting, map[string]interface{}{ "state": configstore.ManagedWarehouseStateDeleted, "status_message": "Resources deleted", diff --git a/controlplane/provisioner/controller_lakekeeper_test.go b/controlplane/provisioner/controller_lakekeeper_test.go index 1f4430a3..e92a6bd5 100644 --- a/controlplane/provisioner/controller_lakekeeper_test.go +++ b/controlplane/provisioner/controller_lakekeeper_test.go @@ -117,14 +117,14 @@ func TestReconcileLakekeeper_DriftCorrectsWhenAlreadyProvisioned(t *testing.T) { k8sClient, dyn, _ := newFakeLakekeeperClient() p := NewLakekeeperProvisioner(newFakeStore(), k8sClient) - // Seed a legacy-named CR carrying the org label. Its name intentionally - // differs from LakekeeperResourceName("acme") to prove the patch is matched - // by label, not by a recomputed name (the post-#632 hyphenation bug). + // Seed an existing CR carrying the org label. Its name intentionally differs + // from LakekeeperResourceName("acme") to prove the patch is matched by label, + // not by a recomputed name. seed := &unstructured.Unstructured{Object: map[string]interface{}{ "apiVersion": "lakekeeper.k8s.lakekeeper.io/v1alpha1", "kind": "Lakekeeper", "metadata": map[string]interface{}{ - "name": "lakekeeper-acme-legacy", + "name": "lakekeeper-acme-existing", "namespace": k8sClient.namespace, "labels": map[string]interface{}{"duckgres/active-org": "acme"}, }, @@ -156,8 +156,8 @@ func TestReconcileLakekeeper_DriftCorrectsWhenAlreadyProvisioned(t *testing.T) { if called { t.Errorf("inputs resolver should NOT be called for pod-shape drift correction") } - // The existing legacy-named CR was patched in place by label. - got, err := dyn.Resource(lakekeeperGVR).Namespace(k8sClient.namespace).Get(context.Background(), "lakekeeper-acme-legacy", metav1.GetOptions{}) + // The existing differently named CR was patched in place by label. + got, err := dyn.Resource(lakekeeperGVR).Namespace(k8sClient.namespace).Get(context.Background(), "lakekeeper-acme-existing", metav1.GetOptions{}) if err != nil { t.Fatalf("get patched CR: %v", err) } diff --git a/controlplane/provisioner/controller_test.go b/controlplane/provisioner/controller_test.go index 301677ea..a8cfe72d 100644 --- a/controlplane/provisioner/controller_test.go +++ b/controlplane/provisioner/controller_test.go @@ -12,9 +12,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - dynamicfake "k8s.io/client-go/dynamic/fake" "github.com/posthog/duckgres/controlplane/configstore" ) @@ -162,23 +159,6 @@ func (s *fakeStore) UpdateIcebergConfig(orgID string, updates map[string]interfa // Compile-time check that fakeStore satisfies WarehouseStore. var _ WarehouseStore = (*fakeStore)(nil) -func newFakeDucklingClient() (*DucklingClient, *dynamicfake.FakeDynamicClient) { - scheme := runtime.NewScheme() - scheme.AddKnownTypeWithName(schema.GroupVersionKind{ - Group: "k8s.posthog.com", - Version: "v1alpha1", - Kind: "Duckling", - }, &unstructured.Unstructured{}) - scheme.AddKnownTypeWithName(schema.GroupVersionKind{ - Group: "k8s.posthog.com", - Version: "v1alpha1", - Kind: "DucklingList", - }, &unstructured.UnstructuredList{}) - - fakeClient := dynamicfake.NewSimpleDynamicClient(scheme) - return NewDucklingClientWithDynamic(fakeClient), fakeClient -} - func TestReconcilePendingCreatesCR(t *testing.T) { dc, fakeK8s := newFakeDucklingClient() fs := newFakeStore() @@ -520,6 +500,25 @@ func TestReconcileProvisioningAllReady(t *testing.T) { } // Create a Duckling CR with all status fields populated + status := map[string]interface{}{ + "metadataStore": map[string]interface{}{ + "type": "external", + "endpoint": "org-b.cluster.us-east-1.rds.amazonaws.com", + "password": "supersecret123", + "user": "postgres", + "database": "postgres", + }, + "dataStore": map[string]interface{}{ + "type": "s3bucket", + "bucketName": "org-b-bucket", + }, + "iamRoleArn": "arn:aws:iam::123456789012:role/duckling-org-b", + "conditions": []interface{}{ + map[string]interface{}{"type": "Ready", "status": "True"}, + map[string]interface{}{"type": "Synced", "status": "True"}, + }, + } + addS3BucketRef(status, "org-b-bucket") cr := &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "k8s.posthog.com/v1alpha1", @@ -528,34 +527,12 @@ func TestReconcileProvisioningAllReady(t *testing.T) { "name": ducklingName("org-b"), "namespace": ducklingNamespace, }, - "status": map[string]interface{}{ - "metadataStore": map[string]interface{}{ - "type": "external", - "endpoint": "org-b.cluster.us-east-1.rds.amazonaws.com", - "password": "supersecret123", - "user": "postgres", - "database": "postgres", - }, - "dataStore": map[string]interface{}{ - "type": "s3bucket", - "bucketName": "org-b-bucket", - }, - "iamRoleArn": "arn:aws:iam::123456789012:role/duckling-org-b", - "conditions": []interface{}{ - map[string]interface{}{ - "type": "Ready", - "status": "True", - }, - map[string]interface{}{ - "type": "Synced", - "status": "True", - }, - }, - }, + "status": status, }, } ctx := context.Background() + seedReadyS3Bucket(t, fakeK8s, ctx, "org-b-bucket") _, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Create(ctx, cr, metav1.CreateOptions{}) if err != nil { t.Fatalf("failed to create test CR: %v", err) @@ -604,6 +581,26 @@ func TestReconcileProvisioningProbeFailsKeepsProvisioning(t *testing.T) { CreatedAt: time.Now(), } + status := map[string]interface{}{ + "metadataStore": map[string]interface{}{ + "type": "external", + "endpoint": "org-probe.cluster.us-east-1.rds.amazonaws.com", + "pgbouncerEndpoint": "pgbouncer-duckling-org-probe.ducklings.svc.cluster.local:6543", + "password": "supersecret123", + "user": "postgres", + "database": "postgres", + }, + "dataStore": map[string]interface{}{ + "type": "s3bucket", + "bucketName": "org-probe-bucket", + }, + "iamRoleArn": "arn:aws:iam::123456789012:role/duckling-org-probe", + "conditions": []interface{}{ + map[string]interface{}{"type": "Ready", "status": "True"}, + map[string]interface{}{"type": "Synced", "status": "True"}, + }, + } + addS3BucketRef(status, "org-probe-bucket") cr := &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "k8s.posthog.com/v1alpha1", @@ -612,29 +609,12 @@ func TestReconcileProvisioningProbeFailsKeepsProvisioning(t *testing.T) { "name": ducklingName("org-probe"), "namespace": ducklingNamespace, }, - "status": map[string]interface{}{ - "metadataStore": map[string]interface{}{ - "type": "external", - "endpoint": "org-probe.cluster.us-east-1.rds.amazonaws.com", - "pgbouncerEndpoint": "pgbouncer-duckling-org-probe.ducklings.svc.cluster.local:6543", - "password": "supersecret123", - "user": "postgres", - "database": "postgres", - }, - "dataStore": map[string]interface{}{ - "type": "s3bucket", - "bucketName": "org-probe-bucket", - }, - "iamRoleArn": "arn:aws:iam::123456789012:role/duckling-org-probe", - "conditions": []interface{}{ - map[string]interface{}{"type": "Ready", "status": "True"}, - map[string]interface{}{"type": "Synced", "status": "True"}, - }, - }, + "status": status, }, } ctx := context.Background() + seedReadyS3Bucket(t, fakeK8s, ctx, "org-probe-bucket") if _, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Create(ctx, cr, metav1.CreateOptions{}); err != nil { t.Fatalf("failed to create test CR: %v", err) } @@ -694,6 +674,23 @@ func TestReconcileProvisioningProbesPgBouncerWhenEnabled(t *testing.T) { CreatedAt: time.Now(), } + status := map[string]interface{}{ + "metadataStore": map[string]interface{}{ + "type": "external", + "endpoint": "org-pgb.cluster.us-east-1.rds.amazonaws.com", + "pgbouncerEndpoint": "pgbouncer-duckling-org-pgb.ducklings.svc.cluster.local:6543", + "password": "supersecret123", + "user": "postgres", + "database": "postgres", + }, + "dataStore": map[string]interface{}{"type": "s3bucket", "bucketName": "org-pgb-bucket"}, + "iamRoleArn": "arn:aws:iam::123456789012:role/duckling-org-pgb", + "conditions": []interface{}{ + map[string]interface{}{"type": "Ready", "status": "True"}, + map[string]interface{}{"type": "Synced", "status": "True"}, + }, + } + addS3BucketRef(status, "org-pgb-bucket") cr := &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "k8s.posthog.com/v1alpha1", @@ -702,26 +699,12 @@ func TestReconcileProvisioningProbesPgBouncerWhenEnabled(t *testing.T) { "name": ducklingName("org-pgb"), "namespace": ducklingNamespace, }, - "status": map[string]interface{}{ - "metadataStore": map[string]interface{}{ - "type": "external", - "endpoint": "org-pgb.cluster.us-east-1.rds.amazonaws.com", - "pgbouncerEndpoint": "pgbouncer-duckling-org-pgb.ducklings.svc.cluster.local:6543", - "password": "supersecret123", - "user": "postgres", - "database": "postgres", - }, - "dataStore": map[string]interface{}{"type": "s3bucket", "bucketName": "org-pgb-bucket"}, - "iamRoleArn": "arn:aws:iam::123456789012:role/duckling-org-pgb", - "conditions": []interface{}{ - map[string]interface{}{"type": "Ready", "status": "True"}, - map[string]interface{}{"type": "Synced", "status": "True"}, - }, - }, + "status": status, }, } ctx := context.Background() + seedReadyS3Bucket(t, fakeK8s, ctx, "org-pgb-bucket") if _, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Create(ctx, cr, metav1.CreateOptions{}); err != nil { t.Fatalf("failed to create test CR: %v", err) } @@ -745,7 +728,7 @@ func TestReconcileProvisioningProbesPgBouncerWhenEnabled(t *testing.T) { } } -func TestReconcileDeletingDeletesCR(t *testing.T) { +func TestReconcileDeletingWaitsForCRToDisappear(t *testing.T) { dc, fakeK8s := newFakeDucklingClient() fs := newFakeStore() fs.warehouses["org-c"] = &configstore.ManagedWarehouse{ @@ -780,9 +763,13 @@ func TestReconcileDeletingDeletesCR(t *testing.T) { return } - // Verify state transitioned to deleted + if fs.warehouses["org-c"].State != configstore.ManagedWarehouseStateDeleting { + t.Fatalf("expected deleting state after delete request, got %q", fs.warehouses["org-c"].State) + } + + ctrl.reconcile(ctx) if fs.warehouses["org-c"].State != configstore.ManagedWarehouseStateDeleted { - t.Fatalf("expected deleted state, got %q", fs.warehouses["org-c"].State) + t.Fatalf("expected deleted state after CR disappeared, got %q", fs.warehouses["org-c"].State) } } @@ -1042,6 +1029,25 @@ func TestReconcileProvisioningCnpgShardGatedOnIceberg(t *testing.T) { }, } + status := map[string]interface{}{ + "metadataStore": map[string]interface{}{ + "type": configstore.MetadataStoreKindCnpgShard, + "endpoint": "shard-001-pooler.cnpg-shards.svc.cluster.local", + "password": "from-provider-sql", + "user": "lakekeeper_org_cs", + "database": "lakekeeper_org_cs", + }, + "dataStore": map[string]interface{}{ + "type": "s3bucket", + "bucketName": "org-cs-bucket", + }, + "iamRoleArn": "arn:aws:iam::123456789012:role/duckling-org-cs", + "conditions": []interface{}{ + map[string]interface{}{"type": "Ready", "status": "True"}, + map[string]interface{}{"type": "Synced", "status": "True"}, + }, + } + addS3BucketRef(status, "org-cs-bucket") cr := &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "k8s.posthog.com/v1alpha1", @@ -1050,27 +1056,11 @@ func TestReconcileProvisioningCnpgShardGatedOnIceberg(t *testing.T) { "name": ducklingName("org-cs"), "namespace": ducklingNamespace, }, - "status": map[string]interface{}{ - "metadataStore": map[string]interface{}{ - "type": configstore.MetadataStoreKindCnpgShard, - "endpoint": "shard-001-pooler.cnpg-shards.svc.cluster.local", - "password": "from-provider-sql", - "user": "lakekeeper_org_cs", - "database": "lakekeeper_org_cs", - }, - "dataStore": map[string]interface{}{ - "type": "s3bucket", - "bucketName": "org-cs-bucket", - }, - "iamRoleArn": "arn:aws:iam::123456789012:role/duckling-org-cs", - "conditions": []interface{}{ - map[string]interface{}{"type": "Ready", "status": "True"}, - map[string]interface{}{"type": "Synced", "status": "True"}, - }, - }, + "status": status, }, } ctx := context.Background() + seedReadyS3Bucket(t, fakeK8s, ctx, "org-cs-bucket") if _, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Create(ctx, cr, metav1.CreateOptions{}); err != nil { t.Fatalf("create CR: %v", err) } @@ -1206,77 +1196,37 @@ func TestReconcilePendingCreatesDuckLakeExternalCR(t *testing.T) { } } -// TestDucklingCreateExternalRequiresFields verifies the Create guard: an -// external CR is rejected without endpoint + passwordAwsSecret. -func TestDucklingCreateExternalRequiresFields(t *testing.T) { +func TestReconcileProvisioningDoesNotMarkS3ReadyWhenBucketRefNotReady(t *testing.T) { dc, fakeK8s := newFakeDucklingClient() - ctx := context.Background() - if err := dc.Create(ctx, "ext-bad", CreateOptions{MetadataStoreType: configstore.MetadataStoreKindExternal}); err == nil { - t.Fatal("expected error creating external CR without endpoint/passwordAwsSecret") - } - if _, getErr := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Get(ctx, ducklingName("ext-bad"), metav1.GetOptions{}); getErr == nil { - t.Error("CR should not have been created when external validation failed") - } -} - -// TestDucklingCreateExternalDataStoreRequiresBucket verifies the dataStore -// guard: an external dataStore needs a bucket name. -func TestDucklingCreateExternalDataStoreRequiresBucket(t *testing.T) { - dc, _ := newFakeDucklingClient() - err := dc.Create(context.Background(), "ext-nobucket", CreateOptions{ - MetadataStoreType: configstore.MetadataStoreKindExternal, - ExternalEndpoint: "h", - ExternalPasswordAWSSecret: "s", - DataStoreType: "external", - }) - if err == nil { - t.Fatal("expected error: external dataStore without a bucket name") + fs := newFakeStore() + fs.warehouses["org-bucket"] = &configstore.ManagedWarehouse{ + OrgID: "org-bucket", + State: configstore.ManagedWarehouseStateProvisioning, + CreatedAt: time.Now(), } -} - -// TestDucklingGetFallsBackToLegacyName verifies the backward-compat path: a CR -// created before ducklingName preserved hyphens (i.e. named with hyphens -// stripped) is still found when looked up by its hyphenated org ID. Without -// this, switching ducklingName to keep hyphens would orphan existing prod -// ducklings whose org IDs contain hyphens (e.g. UUID-named tenants). -func TestDucklingGetFallsBackToLegacyName(t *testing.T) { - dc, fakeK8s := newFakeDucklingClient() ctx := context.Background() - org := "018d351a-9ff7-0000-eaff-4628875ad045" - legacy := legacyDucklingName(org) // "018d351a9ff70000eaff4628875ad045" - if ducklingName(org) == legacy { - t.Fatal("test premise: hyphenated org id must differ from its legacy de-hyphenated name") - } - - // Seed a CR under the legacy (de-hyphenated) name, as the old code would have. - cr := &unstructured.Unstructured{Object: map[string]interface{}{ - "apiVersion": "k8s.posthog.com/v1alpha1", - "kind": "Duckling", - "metadata": map[string]interface{}{"name": legacy, "namespace": ducklingNamespace}, - "status": map[string]interface{}{"iamRoleArn": "arn:aws:iam::123:role/duckling-" + legacy}, - }} + bucketName := "posthog-duckling-org-bucket-mw-prod-us" + status := readyDucklingStatus(bucketName) + addS3BucketRef(status, bucketName) + cr := ducklingCR(ducklingName("org-bucket"), status) if _, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Create(ctx, cr, metav1.CreateOptions{}); err != nil { - t.Fatalf("seed legacy CR: %v", err) + t.Fatalf("seed Duckling CR: %v", err) } - - st, err := dc.Get(ctx, org) - if err != nil { - t.Fatalf("Get must fall back to the legacy de-hyphenated name, got: %v", err) - } - if st.IAMRoleARN == "" { - t.Error("expected to parse the legacy CR's status") + if _, err := fakeK8s.Resource(s3BucketGVR).Create(ctx, bucketCR(bucketName, false, "InvalidBucketName"), metav1.CreateOptions{}); err != nil { + t.Fatalf("seed Bucket CR: %v", err) } - // And iceberg/pgbouncer reads + delete must resolve it too. - if _, err := dc.GetIcebergEnabled(ctx, org); err != nil { - t.Errorf("GetIcebergEnabled fallback: %v", err) - } - if err := dc.Delete(ctx, org); err != nil { - t.Errorf("Delete fallback: %v", err) + ctrl := NewControllerWithClient(fs, dc, time.Second) + ctrl.SetProbe(func(context.Context, string, string, string, string, string) error { return nil }) + ctrl.reconcile(ctx) + + w := fs.warehouses["org-bucket"] + if w.S3State == configstore.ManagedWarehouseStateReady { + t.Fatalf("s3_state should not be ready while Bucket is not ready") } - if _, getErr := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Get(ctx, legacy, metav1.GetOptions{}); getErr == nil { - t.Error("legacy CR should have been deleted via the fallback") + if w.State == configstore.ManagedWarehouseStateReady { + t.Fatalf("warehouse should not be ready while Bucket is not ready") } } diff --git a/controlplane/provisioner/k8s_client.go b/controlplane/provisioner/k8s_client.go index 02c6f8be..3c0c2270 100644 --- a/controlplane/provisioner/k8s_client.go +++ b/controlplane/provisioner/k8s_client.go @@ -25,7 +25,44 @@ var ducklingGVR = schema.GroupVersionResource{ Resource: "ducklings", } +var s3BucketGVR = schema.GroupVersionResource{ + Group: "s3.aws.upbound.io", + Version: "v1beta1", + Resource: "buckets", +} + const ducklingNamespace = "ducklings" +const statusField = "status" +const specField = "spec" +const crossplaneSpecField = "crossplane" +const crossplaneResourceRefsField = "resourceRefs" +const s3BucketAPIVersion = "s3.aws.upbound.io/v1beta1" +const crossplaneBucketKind = "Bucket" + +var crossplaneResourceRefBlockPaths = [][]string{ + {statusField}, + {specField}, + {specField, crossplaneSpecField}, +} + +type DataStoreReadinessSource string + +const ( + DataStoreReadinessSourceDucklingStatus DataStoreReadinessSource = "duckling_status" + DataStoreReadinessSourceComposedBucket DataStoreReadinessSource = "composed_bucket" + DataStoreReadinessSourceComposedError DataStoreReadinessSource = "composed_resource_error" +) + +type DataStoreReadiness struct { + Ready bool + Source DataStoreReadinessSource + Message string +} + +type DucklingProvisioningStatus struct { + *DucklingStatus + DataStoreReadiness DataStoreReadiness +} // DucklingStatus holds the parsed status from a Duckling CR. // The Duckling composition provisions AWS infrastructure (S3, IAM) and the @@ -87,16 +124,7 @@ func NewDucklingClientWithDynamic(client dynamic.Interface) *DucklingClient { return &DucklingClient{client: client} } -// ducklingName is the k8s/AWS resource name derived from an org ID, used for -// the Duckling CR, the IAM role (duckling-), the S3 bucket, the -// Lakekeeper CR/SA/Secret, etc. Org IDs are validated as DNS-1123 labels at -// provision time (lowercase alphanumerics + hyphens), so this only lowercases: -// hyphens are preserved, keeping in-cluster names human-readable and injective -// with the org ID. -// -// (It used to strip hyphens to keep per-org Aurora RDS cluster identifiers -// short, but the control plane no longer provisions per-org Aurora clusters, -// and stripping was lossy — "a-b" and "ab" collided.) +// ducklingName is the Kubernetes Duckling CR name derived from an org ID. func ducklingName(orgID string) string { return strings.ToLower(orgID) } @@ -117,15 +145,6 @@ func pgIdentSuffix(orgID string) string { return pgIdentSanitizeRe.ReplaceAllString(strings.ToLower(orgID), "_") } -// legacyDucklingName is the pre-hyphen-preservation transform (hyphens -// stripped). Retained ONLY so lookups can still find Duckling CRs created -// before ducklingName started preserving hyphens — e.g. a CR named -// "018d351a9ff70000eaff4628875ad045" for org "018d351a-9ff7-0000-eaff-...". -// New CRs are always created under ducklingName (hyphen-preserving). -func legacyDucklingName(orgID string) string { - return strings.ReplaceAll(strings.ToLower(orgID), "-", "") -} - // CreateOptions carries per-org knobs that shape the generated Duckling CR. type CreateOptions struct { // MetadataStoreType selects the Duckling's metadata-store backend. The @@ -270,23 +289,13 @@ func (d *DucklingClient) Create(ctx context.Context, orgID string, opts CreateOp return nil } -// getCR fetches the org's Duckling CR, trying the current hyphen-preserving -// name first and falling back to the legacy de-hyphenated name for CRs created -// before ducklingName preserved hyphens. Returns the CR and the name it was -// found under (callers that mutate need the actual name). When neither exists, -// returns the current-name NotFound error so callers see the new scheme. +// getCR fetches the org's Duckling CR. +// Returns the CR and the name it was found under (callers that mutate need the +// actual name). func (d *DucklingClient) getCR(ctx context.Context, orgID string) (*unstructured.Unstructured, string, error) { name := ducklingName(orgID) cr, err := d.client.Resource(ducklingGVR).Namespace(ducklingNamespace).Get(ctx, name, metav1.GetOptions{}) - if err == nil { - return cr, name, nil - } - if legacy := legacyDucklingName(orgID); legacy != name && apierrors.IsNotFound(err) { - if lcr, lerr := d.client.Resource(ducklingGVR).Namespace(ducklingNamespace).Get(ctx, legacy, metav1.GetOptions{}); lerr == nil { - return lcr, legacy, nil - } - } - return nil, name, err + return cr, name, err } // Get fetches the Duckling CR and parses its status. @@ -298,8 +307,22 @@ func (d *DucklingClient) Get(ctx context.Context, orgID string) (*DucklingStatus return parseDucklingStatus(cr) } -// Delete removes the Duckling CR for the given org. Resolves the legacy name so -// pre-rename CRs are still deletable. +func (d *DucklingClient) GetProvisioningStatus(ctx context.Context, orgID string) (*DucklingProvisioningStatus, error) { + cr, name, err := d.getCR(ctx, orgID) + if err != nil { + return nil, fmt.Errorf("get duckling CR %q: %w", name, err) + } + status, err := parseDucklingStatus(cr) + if err != nil { + return nil, err + } + return &DucklingProvisioningStatus{ + DucklingStatus: status, + DataStoreReadiness: d.dataStoreReadiness(ctx, cr, status), + }, nil +} + +// Delete removes the Duckling CR for the given org. func (d *DucklingClient) Delete(ctx context.Context, orgID string) error { _, name, err := d.getCR(ctx, orgID) if apierrors.IsNotFound(err) { @@ -312,6 +335,23 @@ func (d *DucklingClient) Delete(ctx context.Context, orgID string) error { return nil } +func (d *DucklingClient) EnsureDeleted(ctx context.Context, orgID string) (bool, error) { + _, name, err := d.getCR(ctx, orgID) + if apierrors.IsNotFound(err) { + return true, nil + } + if err != nil { + return false, fmt.Errorf("resolve duckling CR for delete %q: %w", name, err) + } + if err := d.client.Resource(ducklingGVR).Namespace(ducklingNamespace).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + if apierrors.IsNotFound(err) { + return true, nil + } + return false, fmt.Errorf("delete duckling CR %q: %w", name, err) + } + return false, nil +} + // GetPgBouncerEnabled reads spec.metadataStore.pgbouncer.enabled from the // Duckling CR. Missing blocks (composition at an older schema, CR never // carried a pgbouncer section) are reported as false — same as an explicit @@ -501,6 +541,101 @@ func parseDucklingStatus(cr *unstructured.Unstructured) (*DucklingStatus, error) return ds, nil } +func (d *DucklingClient) dataStoreReadiness(ctx context.Context, cr *unstructured.Unstructured, status *DucklingStatus) DataStoreReadiness { + if status == nil || status.DataStore.BucketName == "" { + return DataStoreReadiness{ + Ready: false, + Source: DataStoreReadinessSourceDucklingStatus, + Message: "Duckling status does not include a data store bucket name", + } + } + if status.DataStore.Type == "external" { + return DataStoreReadiness{Ready: true, Source: DataStoreReadinessSourceDucklingStatus} + } + refName := bucketResourceRefName(cr) + if refName == "" { + return DataStoreReadiness{ + Ready: false, + Source: DataStoreReadinessSourceDucklingStatus, + Message: "Duckling status has a bucket name but no composed Bucket resourceRef", + } + } + bucket, err := d.client.Resource(s3BucketGVR).Get(ctx, refName, metav1.GetOptions{}) + if err != nil { + return DataStoreReadiness{ + Ready: false, + Source: DataStoreReadinessSourceComposedError, + Message: fmt.Sprintf("read composed S3 bucket %q: %v", refName, err), + } + } + ready, message := crossplaneReady(bucket) + if !ready && message == "" { + message = fmt.Sprintf("S3 bucket %q is not ready", refName) + } + return DataStoreReadiness{Ready: ready, Source: DataStoreReadinessSourceComposedBucket, Message: message} +} + +func bucketResourceRefName(cr *unstructured.Unstructured) string { + return composedResourceRefName(cr, s3BucketAPIVersion, crossplaneBucketKind) +} + +func composedResourceRefName(cr *unstructured.Unstructured, apiVersion string, kind string) string { + for _, path := range crossplaneResourceRefBlockPaths { + block, ok, _ := unstructured.NestedMap(cr.Object, path...) + if !ok { + continue + } + if name := resourceRefNameFromBlock(block, apiVersion, kind); name != "" { + return name + } + } + return "" +} + +func resourceRefNameFromBlock(block map[string]interface{}, apiVersion string, kind string) string { + refs, _ := block[crossplaneResourceRefsField].([]interface{}) + for _, ref := range refs { + refMap, ok := ref.(map[string]interface{}) + if !ok { + continue + } + if getNestedString(refMap, "apiVersion") == apiVersion && getNestedString(refMap, "kind") == kind { + return getNestedString(refMap, "name") + } + } + return "" +} + +func crossplaneReady(obj *unstructured.Unstructured) (bool, string) { + status, ok := obj.Object["status"].(map[string]interface{}) + if !ok { + return false, "" + } + conditions, _ := status["conditions"].([]interface{}) + ready := false + var message string + for _, cond := range conditions { + condMap, ok := cond.(map[string]interface{}) + if !ok { + continue + } + condType := getNestedString(condMap, "type") + condStatus := getNestedString(condMap, "status") + if condType == "Ready" { + ready = condStatus == "True" + if !ready { + message = getNestedString(condMap, "message") + } + } + if condType == "Synced" && condStatus == "False" { + if syncMessage := getNestedString(condMap, "message"); syncMessage != "" { + message = syncMessage + } + } + } + return ready, message +} + func getNestedString(obj map[string]interface{}, key string) string { v, _ := obj[key].(string) return v diff --git a/controlplane/provisioner/k8s_client_test.go b/controlplane/provisioner/k8s_client_test.go new file mode 100644 index 00000000..bf1539db --- /dev/null +++ b/controlplane/provisioner/k8s_client_test.go @@ -0,0 +1,260 @@ +//go:build kubernetes + +package provisioner + +import ( + "context" + "strings" + "testing" + + "github.com/posthog/duckgres/controlplane/configstore" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + dynamicfake "k8s.io/client-go/dynamic/fake" +) + +func newFakeDucklingClient() (*DucklingClient, *dynamicfake.FakeDynamicClient) { + scheme := runtime.NewScheme() + fakeClient := dynamicfake.NewSimpleDynamicClientWithCustomListKinds(scheme, map[schema.GroupVersionResource]string{ + ducklingGVR: "DucklingList", + s3BucketGVR: "BucketList", + }) + return NewDucklingClientWithDynamic(fakeClient), fakeClient +} + +func ducklingCR(name string, status map[string]interface{}) *unstructured.Unstructured { + return &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": "k8s.posthog.com/v1alpha1", + "kind": "Duckling", + "metadata": map[string]interface{}{ + "name": name, + "namespace": ducklingNamespace, + }, + "status": status, + }} +} + +func readyDucklingStatus(bucketName string) map[string]interface{} { + return map[string]interface{}{ + "metadataStore": map[string]interface{}{ + "type": "external", + "endpoint": "org-bucket.cluster.us-east-1.rds.amazonaws.com", + "password": "supersecret123", + "user": "postgres", + "database": "postgres", + }, + "dataStore": map[string]interface{}{ + "type": "s3bucket", + "bucketName": bucketName, + "s3Region": "us-east-1", + }, + "iamRoleArn": "arn:aws:iam::123456789012:role/duckling-orgbucket", + "conditions": []interface{}{ + map[string]interface{}{"type": "Ready", "status": "True"}, + map[string]interface{}{"type": "Synced", "status": "True"}, + }, + } +} + +func addS3BucketRef(status map[string]interface{}, bucketName string) { + status["resourceRefs"] = []interface{}{ + map[string]interface{}{ + "apiVersion": s3BucketAPIVersion, + "kind": crossplaneBucketKind, + "name": bucketName, + }, + } +} + +func addCrossplaneS3BucketRef(cr *unstructured.Unstructured, bucketName string) { + cr.Object["spec"] = map[string]interface{}{ + crossplaneSpecField: map[string]interface{}{ + crossplaneResourceRefsField: []interface{}{ + map[string]interface{}{ + "apiVersion": s3BucketAPIVersion, + "kind": crossplaneBucketKind, + "name": bucketName, + }, + }, + }, + } +} + +func bucketCR(name string, ready bool, message string) *unstructured.Unstructured { + readyStatus := "False" + syncedStatus := "False" + if ready { + readyStatus = "True" + syncedStatus = "True" + } + return &unstructured.Unstructured{Object: map[string]interface{}{ + "apiVersion": s3BucketAPIVersion, + "kind": crossplaneBucketKind, + "metadata": map[string]interface{}{"name": name}, + "status": map[string]interface{}{ + "conditions": []interface{}{ + map[string]interface{}{"type": "Ready", "status": readyStatus}, + map[string]interface{}{"type": "Synced", "status": syncedStatus, "message": message}, + }, + }, + }} +} + +func seedReadyS3Bucket(t *testing.T, fakeK8s *dynamicfake.FakeDynamicClient, ctx context.Context, bucketName string) { + t.Helper() + if _, err := fakeK8s.Resource(s3BucketGVR).Create(ctx, bucketCR(bucketName, true, ""), metav1.CreateOptions{}); err != nil { + t.Fatalf("seed ready S3 Bucket %q: %v", bucketName, err) + } +} + +func TestDucklingCreateExternalRequiresFields(t *testing.T) { + dc, fakeK8s := newFakeDucklingClient() + ctx := context.Background() + if err := dc.Create(ctx, "ext-bad", CreateOptions{MetadataStoreType: configstore.MetadataStoreKindExternal}); err == nil { + t.Fatal("expected error creating external CR without endpoint/passwordAwsSecret") + } + if _, getErr := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Get(ctx, ducklingName("ext-bad"), metav1.GetOptions{}); getErr == nil { + t.Error("CR should not have been created when external validation failed") + } +} + +func TestDucklingCreateExternalDataStoreRequiresBucket(t *testing.T) { + dc, _ := newFakeDucklingClient() + err := dc.Create(context.Background(), "ext-nobucket", CreateOptions{ + MetadataStoreType: configstore.MetadataStoreKindExternal, + ExternalEndpoint: "h", + ExternalPasswordAWSSecret: "s", + DataStoreType: "external", + }) + if err == nil { + t.Fatal("expected error: external dataStore without a bucket name") + } +} + +func TestDucklingGetDoesNotInspectComposedBucketReadiness(t *testing.T) { + dc, fakeK8s := newFakeDucklingClient() + ctx := context.Background() + + status := readyDucklingStatus("posthog-duckling-org-bucket-mw-prod-us") + addS3BucketRef(status, "posthog-duckling-org-bucket-mw-prod-us") + if _, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Create(ctx, ducklingCR(ducklingName("org-bucket"), status), metav1.CreateOptions{}); err != nil { + t.Fatalf("seed Duckling CR: %v", err) + } + + got, err := dc.Get(ctx, "org-bucket") + if err != nil { + t.Fatalf("Get should only parse Duckling status: %v", err) + } + if got.SyncedFalseMessage != "" { + t.Fatalf("Get should not mutate status with composed Bucket errors, got %q", got.SyncedFalseMessage) + } +} + +func TestDucklingGetProvisioningStatusReportsBucketReadinessSource(t *testing.T) { + dc, fakeK8s := newFakeDucklingClient() + ctx := context.Background() + bucketName := "posthog-duckling-org-bucket-mw-prod-us" + + cr := ducklingCR(ducklingName("org-bucket"), map[string]interface{}{ + "dataStore": map[string]interface{}{ + "type": "s3bucket", + "bucketName": bucketName, + "s3Region": "us-east-1", + }, + }) + addCrossplaneS3BucketRef(cr, bucketName) + if _, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Create(ctx, cr, metav1.CreateOptions{}); err != nil { + t.Fatalf("seed Duckling CR: %v", err) + } + if _, err := fakeK8s.Resource(s3BucketGVR).Create(ctx, bucketCR(bucketName, false, "InvalidBucketName"), metav1.CreateOptions{}); err != nil { + t.Fatalf("seed Bucket CR: %v", err) + } + + status, err := dc.GetProvisioningStatus(ctx, "org-bucket") + if err != nil { + t.Fatalf("GetProvisioningStatus: %v", err) + } + if status.DataStoreReadiness.Ready { + t.Fatalf("expected DataStoreReadiness.Ready=false") + } + if status.DataStoreReadiness.Source != DataStoreReadinessSourceComposedBucket { + t.Fatalf("source = %q, want %q", status.DataStoreReadiness.Source, DataStoreReadinessSourceComposedBucket) + } + if !strings.Contains(status.DataStoreReadiness.Message, "InvalidBucketName") { + t.Fatalf("message = %q, want InvalidBucketName", status.DataStoreReadiness.Message) + } +} + +func TestDucklingEnsureDeletedReportsWhetherCRIsGone(t *testing.T) { + dc, fakeK8s := newFakeDucklingClient() + ctx := context.Background() + + if _, err := fakeK8s.Resource(ducklingGVR).Namespace(ducklingNamespace).Create(ctx, ducklingCR(ducklingName("org-delete"), nil), metav1.CreateOptions{}); err != nil { + t.Fatalf("seed Duckling CR: %v", err) + } + + gone, err := dc.EnsureDeleted(ctx, "org-delete") + if err != nil { + t.Fatalf("EnsureDeleted first call: %v", err) + } + if gone { + t.Fatal("first EnsureDeleted call should request deletion and report gone=false") + } + + gone, err = dc.EnsureDeleted(ctx, "org-delete") + if err != nil { + t.Fatalf("EnsureDeleted second call: %v", err) + } + if !gone { + t.Fatal("second EnsureDeleted call should report gone=true after CR is absent") + } +} + +func TestBucketResourceRefNameReadsCrossplaneResourceRefs(t *testing.T) { + for name, tc := range map[string]struct { + object map[string]interface{} + want string + }{ + "status.resourceRefs": { + object: map[string]interface{}{"status": map[string]interface{}{ + "resourceRefs": []interface{}{map[string]interface{}{ + "apiVersion": s3BucketAPIVersion, + "kind": crossplaneBucketKind, + "name": "bucket-from-status-ref", + }}, + }}, + want: "bucket-from-status-ref", + }, + "spec.resourceRefs": { + object: map[string]interface{}{"spec": map[string]interface{}{ + "resourceRefs": []interface{}{map[string]interface{}{ + "apiVersion": s3BucketAPIVersion, + "kind": crossplaneBucketKind, + "name": "bucket-from-spec-ref", + }}, + }}, + want: "bucket-from-spec-ref", + }, + "spec.crossplane.resourceRefs": { + object: map[string]interface{}{"spec": map[string]interface{}{ + "crossplane": map[string]interface{}{ + "resourceRefs": []interface{}{map[string]interface{}{ + "apiVersion": s3BucketAPIVersion, + "kind": crossplaneBucketKind, + "name": "bucket-from-crossplane-ref", + }}, + }, + }}, + want: "bucket-from-crossplane-ref", + }, + } { + t.Run(name, func(t *testing.T) { + cr := &unstructured.Unstructured{Object: tc.object} + if got := bucketResourceRefName(cr); got != tc.want { + t.Fatalf("bucketResourceRefName = %q, want %q", got, tc.want) + } + }) + } +} diff --git a/controlplane/provisioner/lakekeeper_k8s.go b/controlplane/provisioner/lakekeeper_k8s.go index e3b4a064..3023371e 100644 --- a/controlplane/provisioner/lakekeeper_k8s.go +++ b/controlplane/provisioner/lakekeeper_k8s.go @@ -79,13 +79,9 @@ func lakekeeperPodMetadata() map[string]interface{} { // scrape annotations) onto every existing Lakekeeper CR for the org, matched by // the duckgres/active-org label. // -// It deliberately does NOT recompute the CR name from the orgID. Post-#632, -// LakekeeperResourceName preserves hyphens, but a legacy org's CR — and its -// Secret, ServiceAccount, and EKS pod-identity, all derived from the no-hyphen -// Duckling XR name — keeps the de-hyphenated name. Looking up by label patches -// whatever name actually exists (no-dash for legacy orgs, hyphenated for new -// ones) instead of minting a duplicate CR under a name that has no matching -// Secret/SA/pod-identity. +// It deliberately does NOT recompute the CR name from the orgID. Looking up by +// label patches whatever name actually exists instead of minting a duplicate CR +// under a name that has no matching Secret/SA/pod-identity. // // Uses a JSON merge patch, which carries no resourceVersion, so it never races // the operator's frequent status writes (the "object has been modified" @@ -165,8 +161,7 @@ func NewLakekeeperK8sClientWithClients(dc dynamic.Interface, kc kubernetes.Inter } // LakekeeperResourceName derives the K8s resource name (CR + Secret + SA) for -// an org. Uses ducklingName so it preserves hyphens, matching the Duckling CR -// and the rest of the in-cluster resources. +// an org. func LakekeeperResourceName(orgID string) string { return "lakekeeper-" + ducklingName(orgID) } diff --git a/controlplane/provisioner/lakekeeper_k8s_test.go b/controlplane/provisioner/lakekeeper_k8s_test.go index 277deaf2..c75410b7 100644 --- a/controlplane/provisioner/lakekeeper_k8s_test.go +++ b/controlplane/provisioner/lakekeeper_k8s_test.go @@ -32,7 +32,6 @@ func newFakeLakekeeperClient() (*LakekeeperK8sClient, *dynamicfake.FakeDynamicCl } func TestLakekeeperResourceName(t *testing.T) { - // k8s resource names preserve hyphens (only lowercasing is applied). cases := map[string]string{ "acme": "lakekeeper-acme", "019e417b-18c4-7a41": "lakekeeper-019e417b-18c4-7a41", @@ -254,8 +253,8 @@ func TestEnsureCR_CreateAndShape(t *testing.T) { func TestPatchPodShape_LabelMatchedStripsLimits(t *testing.T) { c, dc, _ := newFakeLakekeeperClient() ctx := context.Background() - // Two CRs for one org under different names (legacy de-hyphenated + new - // hyphenated), both label-tagged; the first carries a stale limits block. + // Two CRs for one org under different historical names, both label-tagged; + // the first carries a stale limits block. names := []string{"lakekeeper-acme", "lakekeeper-a-c-m-e"} for i, name := range names { spec := map[string]interface{}{"replicas": int64(1)} diff --git a/controlplane/provisioner/lakekeeper_provisioner.go b/controlplane/provisioner/lakekeeper_provisioner.go index 1ff6bf19..005709cd 100644 --- a/controlplane/provisioner/lakekeeper_provisioner.go +++ b/controlplane/provisioner/lakekeeper_provisioner.go @@ -354,9 +354,7 @@ func (p *LakekeeperProvisioner) buildCRSpec(w *configstore.ManagedWarehouse, in // database / Secret / REST-warehouse pipeline and without resolving inputs. // // It matches CRs by the duckgres/active-org label rather than recomputing the -// name, so it patches whatever CR actually exists — important because a legacy -// org's CR keeps the de-hyphenated name (derived from the no-hyphen Duckling XR) -// while LakekeeperResourceName now preserves hyphens. See +// name, so it patches whatever CR actually exists. See // LakekeeperK8sClient.PatchPodShape for the merge-patch (conflict-free) details. func (p *LakekeeperProvisioner) PatchPodShape(ctx context.Context, orgID string) error { return p.k8s.PatchPodShape(ctx, orgID) @@ -532,8 +530,8 @@ func lakekeeperDBName(orgID string) string { return "lakekeeper_" + pgIdentSuffix(orgID) } -// lakekeeperWarehouseName and oauthClientID are free-form strings (the Iceberg -// REST warehouse name and the OAuth2 client_id), so they preserve hyphens. +// lakekeeperWarehouseName and oauthClientID preserve the org ID suffix used by +// the Lakekeeper CR/Secret/SA. func lakekeeperWarehouseName(orgID string) string { return "org-" + ducklingName(orgID) } diff --git a/controlplane/provisioner/naming_test.go b/controlplane/provisioner/naming_test.go index de51e612..fa5ee4ad 100644 --- a/controlplane/provisioner/naming_test.go +++ b/controlplane/provisioner/naming_test.go @@ -4,24 +4,18 @@ package provisioner import "testing" -// TestDucklingNamePreservesHyphens locks in the post-fix behavior: k8s/AWS -// resource names keep hyphens (only lowercasing is applied), and the transform -// is injective — the regression being the old de-hyphenation where "a-b" and -// "ab" both collapsed to "ab". -func TestDucklingNamePreservesHyphens(t *testing.T) { +func TestDucklingNamePreservesOrgID(t *testing.T) { for in, want := range map[string]string{ "ben-iceberg-cnpg": "ben-iceberg-cnpg", "Ben-Iceberg": "ben-iceberg", "team123": "team123", "f47ac10b-58cc-4372-a567-0e02b2c3d479": "f47ac10b-58cc-4372-a567-0e02b2c3d479", + "f47ac10b58cc4372a5670e02b2c3d479": "f47ac10b58cc4372a5670e02b2c3d479", } { if got := ducklingName(in); got != want { t.Errorf("ducklingName(%q) = %q, want %q", in, got, want) } } - if ducklingName("a-b") == ducklingName("ab") { - t.Error("ducklingName must not collide \"a-b\" with \"ab\" (the old de-hyphenation bug)") - } } // TestPgIdentSuffixSanitizes verifies the Postgres-identifier transform maps @@ -56,3 +50,16 @@ func TestLakekeeperNamesForHyphenatedOrg(t *testing.T) { t.Errorf("oauthClientID = %q", got) } } + +func TestLakekeeperNamesPreserveUUIDHyphens(t *testing.T) { + const org = "f47ac10b-58cc-4372-a567-0e02b2c3d479" + if got := LakekeeperResourceName(org); got != "lakekeeper-f47ac10b-58cc-4372-a567-0e02b2c3d479" { + t.Errorf("LakekeeperResourceName = %q", got) + } + if got := lakekeeperWarehouseName(org); got != "org-f47ac10b-58cc-4372-a567-0e02b2c3d479" { + t.Errorf("lakekeeperWarehouseName = %q", got) + } + if got := oauthClientID(org); got != "duckling-f47ac10b-58cc-4372-a567-0e02b2c3d479" { + t.Errorf("oauthClientID = %q", got) + } +} diff --git a/controlplane/provisioning/api.go b/controlplane/provisioning/api.go index 0f605292..cb7067e3 100644 --- a/controlplane/provisioning/api.go +++ b/controlplane/provisioning/api.go @@ -28,16 +28,10 @@ func isUniqueViolation(err error) bool { } // ducklingOrgIDPattern constrains org IDs that get a provisioned warehouse to a -// single DNS-1123 label (lowercase alphanumerics + hyphens, start/end -// alphanumeric). This is the shape every derived name needs: -// - the SNI prefix . is a single DNS label already; -// - the Duckling CR / IAM role / S3 bucket / Lakekeeper CR names use the org -// ID verbatim (lowercased), so it must be a valid k8s/AWS name; -// - the Postgres identifier maps any non-[a-z0-9_] char to '_', which is only -// injective when the source charset excludes everything but hyphens. -// -// Validating here keeps the org ID → resource-name mappings collision-free. -var ducklingOrgIDPattern = regexp.MustCompile(`^[a-z0-9]([a-z0-9-]*[a-z0-9])?$`) +// canonical lowercase hyphenated UUID. The Crossplane composition may compact +// this ID for bucket-specific names; the strict source shape keeps that +// compaction collision-free. +var ducklingOrgIDPattern = regexp.MustCompile(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`) // Store defines the config store operations needed by the provisioning API. type Store interface { @@ -183,7 +177,7 @@ func (h *handler) provisionWarehouse(c *gin.Context) { } if !ducklingOrgIDPattern.MatchString(orgID) { - c.JSON(http.StatusBadRequest, gin.H{"error": "org id must be a DNS-1123 label (lowercase alphanumerics and hyphens, starting and ending alphanumeric) so the derived resource names are valid and collision-free"}) + c.JSON(http.StatusBadRequest, gin.H{"error": "org id must be a canonical lowercase hyphenated UUID so derived resource names are valid and collision-free"}) return } diff --git a/controlplane/provisioning/api_test.go b/controlplane/provisioning/api_test.go index bd33f106..20ac961e 100644 --- a/controlplane/provisioning/api_test.go +++ b/controlplane/provisioning/api_test.go @@ -14,6 +14,8 @@ import ( "gorm.io/gorm" ) +const testOrgID = "f47ac10b-58cc-4372-a567-0e02b2c3d479" + type fakeStore struct { orgs map[string]*configstore.Org users map[configstore.OrgUserKey]string @@ -178,7 +180,7 @@ func newTestRouter(store Store) *gin.Engine { // cnpg-shard and external. func TestProvisionRejectsAurora(t *testing.T) { store := newFakeStore() - store.orgs["analytics"] = &configstore.Org{Name: "analytics"} + store.orgs[testOrgID] = &configstore.Org{Name: testOrgID} router := newTestRouter(store) body := []byte(`{ @@ -186,7 +188,7 @@ func TestProvisionRejectsAurora(t *testing.T) { "metadata_store": {"type": "aurora"}, "ducklake": {"enabled": true} }`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/analytics/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -194,7 +196,7 @@ func TestProvisionRejectsAurora(t *testing.T) { if rec.Code != http.StatusBadRequest { t.Fatalf("status = %d, want 400 (aurora removed): %s", rec.Code, rec.Body.String()) } - if store.warehouses["analytics"] != nil { + if store.warehouses[testOrgID] != nil { t.Fatal("aurora request must not create a warehouse") } } @@ -204,7 +206,7 @@ func TestProvisionAutoCreatesOrg(t *testing.T) { router := newTestRouter(store) body := []byte(`{"database_name": "test-db", "metadata_store": {"type": "cnpg-shard"}, "iceberg": {"enabled": true}}`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/new-org/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -212,10 +214,10 @@ func TestProvisionAutoCreatesOrg(t *testing.T) { if rec.Code != http.StatusAccepted { t.Fatalf("status = %d, want %d: %s", rec.Code, http.StatusAccepted, rec.Body.String()) } - if _, ok := store.orgs["new-org"]; !ok { + if _, ok := store.orgs[testOrgID]; !ok { t.Fatal("expected org to be auto-created") } - if store.warehouses["new-org"] == nil { + if store.warehouses[testOrgID] == nil { t.Fatal("expected warehouse to be created") } } @@ -225,7 +227,7 @@ func TestProvisionRejectsEmptyBody(t *testing.T) { router := newTestRouter(store) body := []byte(`{}`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/analytics/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -237,15 +239,15 @@ func TestProvisionRejectsEmptyBody(t *testing.T) { func TestProvisionRejectsExistingNonTerminal(t *testing.T) { store := newFakeStore() - store.orgs["analytics"] = &configstore.Org{Name: "analytics"} - store.warehouses["analytics"] = &configstore.ManagedWarehouse{ - OrgID: "analytics", + store.orgs[testOrgID] = &configstore.Org{Name: testOrgID} + store.warehouses[testOrgID] = &configstore.ManagedWarehouse{ + OrgID: testOrgID, State: configstore.ManagedWarehouseStateProvisioning, } router := newTestRouter(store) body := []byte(`{"database_name": "test-db", "metadata_store": {"type": "cnpg-shard"}, "iceberg": {"enabled": true}}`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/analytics/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -257,16 +259,16 @@ func TestProvisionRejectsExistingNonTerminal(t *testing.T) { func TestProvisionAllowsRetryAfterFailure(t *testing.T) { store := newFakeStore() - store.orgs["analytics"] = &configstore.Org{Name: "analytics"} - store.users[configstore.OrgUserKey{OrgID: "analytics", Username: "root"}] = "old-hash" - store.warehouses["analytics"] = &configstore.ManagedWarehouse{ - OrgID: "analytics", + store.orgs[testOrgID] = &configstore.Org{Name: testOrgID} + store.users[configstore.OrgUserKey{OrgID: testOrgID, Username: "root"}] = "old-hash" + store.warehouses[testOrgID] = &configstore.ManagedWarehouse{ + OrgID: testOrgID, State: configstore.ManagedWarehouseStateFailed, } router := newTestRouter(store) body := []byte(`{"database_name": "analytics-db", "metadata_store": {"type": "cnpg-shard"}, "iceberg": {"enabled": true}}`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/analytics/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -274,23 +276,23 @@ func TestProvisionAllowsRetryAfterFailure(t *testing.T) { if rec.Code != http.StatusAccepted { t.Fatalf("status = %d, want %d: %s", rec.Code, http.StatusAccepted, rec.Body.String()) } - if store.warehouses["analytics"].MetadataStore.Kind != configstore.MetadataStoreKindCnpgShard { - t.Fatalf("expected cnpg-shard warehouse after retry, got kind %q", store.warehouses["analytics"].MetadataStore.Kind) + if store.warehouses[testOrgID].MetadataStore.Kind != configstore.MetadataStoreKindCnpgShard { + t.Fatalf("expected cnpg-shard warehouse after retry, got kind %q", store.warehouses[testOrgID].MetadataStore.Kind) } } func TestProvisionAllowsRetryAfterDeleted(t *testing.T) { store := newFakeStore() - store.orgs["analytics"] = &configstore.Org{Name: "analytics"} - store.users[configstore.OrgUserKey{OrgID: "analytics", Username: "root"}] = "old-hash" - store.warehouses["analytics"] = &configstore.ManagedWarehouse{ - OrgID: "analytics", + store.orgs[testOrgID] = &configstore.Org{Name: testOrgID} + store.users[configstore.OrgUserKey{OrgID: testOrgID, Username: "root"}] = "old-hash" + store.warehouses[testOrgID] = &configstore.ManagedWarehouse{ + OrgID: testOrgID, State: configstore.ManagedWarehouseStateDeleted, } router := newTestRouter(store) body := []byte(`{"database_name": "analytics-db", "metadata_store": {"type": "cnpg-shard"}, "iceberg": {"enabled": true}}`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/analytics/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -436,7 +438,7 @@ func TestProvisionTransactionRollsBackOnUserFailure(t *testing.T) { router := newTestRouter(store) body := []byte(`{"database_name": "team-7-db", "metadata_store": {"type": "cnpg-shard"}, "iceberg": {"enabled": true}}`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/7/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -447,14 +449,14 @@ func TestProvisionTransactionRollsBackOnUserFailure(t *testing.T) { // After rollback: no warehouse row, no user row, no Org row. // Retry would treat this as a brand-new provision. - if _, ok := store.warehouses["7"]; ok { - t.Errorf("expected warehouse to be rolled back, got %+v", store.warehouses["7"]) + if _, ok := store.warehouses[testOrgID]; ok { + t.Errorf("expected warehouse to be rolled back, got %+v", store.warehouses[testOrgID]) } - if _, ok := store.users[configstore.OrgUserKey{OrgID: "7", Username: "root"}]; ok { + if _, ok := store.users[configstore.OrgUserKey{OrgID: testOrgID, Username: "root"}]; ok { t.Errorf("expected user row to be absent after rollback") } - if _, ok := store.orgs["7"]; ok { - t.Errorf("expected org row to be rolled back, got %+v", store.orgs["7"]) + if _, ok := store.orgs[testOrgID]; ok { + t.Errorf("expected org row to be rolled back, got %+v", store.orgs[testOrgID]) } // Now clear the hook and retry — should succeed with a clean @@ -462,14 +464,14 @@ func TestProvisionTransactionRollsBackOnUserFailure(t *testing.T) { // retry). store.setProvisionUserFailHook(nil) rec2 := httptest.NewRecorder() - req2 := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/7/provision", bytes.NewReader(body)) + req2 := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req2.Header.Set("Content-Type", "application/json") router.ServeHTTP(rec2, req2) if rec2.Code != http.StatusAccepted { t.Fatalf("retry status = %d, want 202: %s", rec2.Code, rec2.Body.String()) } - if _, ok := store.warehouses["7"]; !ok { + if _, ok := store.warehouses[testOrgID]; !ok { t.Errorf("expected warehouse to be created on retry") } } @@ -495,12 +497,12 @@ func TestResetPasswordRequiresReadyWarehouse(t *testing.T) { func TestProvisionCnpgShard(t *testing.T) { store := newFakeStore() - store.orgs["shardco"] = &configstore.Org{Name: "shardco"} + store.orgs[testOrgID] = &configstore.Org{Name: testOrgID} router := newTestRouter(store) // cnpg-shard takes no sizing and auto-enables iceberg. body := []byte(`{"database_name": "shardco-db", "metadata_store": {"type": "cnpg-shard"}, "iceberg": {"enabled": true}}`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/shardco/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -508,7 +510,7 @@ func TestProvisionCnpgShard(t *testing.T) { if rec.Code != http.StatusAccepted { t.Fatalf("status = %d, want %d: %s", rec.Code, http.StatusAccepted, rec.Body.String()) } - w := store.warehouses["shardco"] + w := store.warehouses[testOrgID] if w == nil { t.Fatal("expected warehouse to be created") return @@ -535,7 +537,7 @@ func TestProvisionIcebergExternal(t *testing.T) { "data_store": {"type": "external", "bucket_name": "posthog-duckling-example", "region": "us-east-1"}, "iceberg": {"enabled": true} }`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/extice/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -543,7 +545,7 @@ func TestProvisionIcebergExternal(t *testing.T) { if rec.Code != http.StatusAccepted { t.Fatalf("status = %d, want %d: %s", rec.Code, http.StatusAccepted, rec.Body.String()) } - w := store.warehouses["extice"] + w := store.warehouses[testOrgID] if w == nil { t.Fatal("expected warehouse to be created") return @@ -575,14 +577,14 @@ func TestProvisionRejectsNoCatalog(t *testing.T) { store := newFakeStore() router := newTestRouter(store) body := []byte(`{"database_name":"nc-db","metadata_store":{"type":"cnpg-shard"}}`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/ncco/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) if rec.Code != http.StatusBadRequest { t.Fatalf("status = %d, want 400: %s", rec.Code, rec.Body.String()) } - if _, ok := store.warehouses["ncco"]; ok { + if _, ok := store.warehouses[testOrgID]; ok { t.Error("warehouse must not be created with no catalog enabled") } } @@ -593,14 +595,14 @@ func TestProvisionCnpgDuckLakeAndIceberg(t *testing.T) { store := newFakeStore() router := newTestRouter(store) body := []byte(`{"database_name":"both-db","metadata_store":{"type":"cnpg-shard"},"ducklake":{"enabled":true},"iceberg":{"enabled":true}}`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/bothco/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) if rec.Code != http.StatusAccepted { t.Fatalf("status = %d, want 202: %s", rec.Code, rec.Body.String()) } - w := store.warehouses["bothco"] + w := store.warehouses[testOrgID] if w == nil || w.MetadataStore.Kind != configstore.MetadataStoreKindCnpgShard { t.Fatalf("expected cnpg-shard warehouse, got %+v", w) } @@ -623,7 +625,7 @@ func TestProvisionDuckLakeExternal(t *testing.T) { "data_store": {"type": "external", "bucket_name": "posthog-duckling-example", "region": "us-east-1"}, "ducklake": {"enabled": true} }`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/extdl/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -631,7 +633,7 @@ func TestProvisionDuckLakeExternal(t *testing.T) { if rec.Code != http.StatusAccepted { t.Fatalf("status = %d, want %d: %s", rec.Code, http.StatusAccepted, rec.Body.String()) } - w := store.warehouses["extdl"] + w := store.warehouses[testOrgID] if w == nil { t.Fatal("expected warehouse to be created") return @@ -653,14 +655,14 @@ func TestProvisionExternalRequiresEndpointAndSecret(t *testing.T) { t.Run(name, func(t *testing.T) { store := newFakeStore() router := newTestRouter(store) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/eco/provision", bytes.NewReader([]byte(body))) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader([]byte(body))) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) if rec.Code != http.StatusBadRequest { t.Fatalf("status = %d, want 400: %s", rec.Code, rec.Body.String()) } - if _, ok := store.warehouses["eco"]; ok { + if _, ok := store.warehouses[testOrgID]; ok { t.Error("warehouse must not be created when required external fields are missing") } }) @@ -672,7 +674,7 @@ func TestProvisionExternalDataStoreRequiresBucket(t *testing.T) { router := newTestRouter(store) // data_store.type=external without bucket_name → 400. body := []byte(`{"database_name":"e-db","ducklake":{"enabled":true},"metadata_store":{"type":"external","external":{"endpoint":"h","password_aws_secret":"s"}},"data_store":{"type":"external"}}`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/eco/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -686,7 +688,7 @@ func TestProvisionRejectsUnsupportedMetadataStore(t *testing.T) { router := newTestRouter(store) body := []byte(`{"database_name": "x-db", "metadata_store": {"type": "neon"}}`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/xco/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) @@ -697,7 +699,16 @@ func TestProvisionRejectsUnsupportedMetadataStore(t *testing.T) { } func TestProvisionRejectsInvalidOrgID(t *testing.T) { - for _, bad := range []string{"ben.iceberg", "Ben-Iceberg", "ben_iceberg", "-bad", "bad-"} { + for _, bad := range []string{ + "analytics", + "ben.iceberg", + "Ben-Iceberg", + "ben_iceberg", + "-bad", + "bad-", + "f47ac10b58cc4372a5670e02b2c3d479", + "f47ac10b-58cc-4372-a567-0e02b2c3d47z", + } { t.Run(bad, func(t *testing.T) { store := newFakeStore() router := newTestRouter(store) @@ -716,15 +727,15 @@ func TestProvisionRejectsInvalidOrgID(t *testing.T) { } } -func TestProvisionAcceptsHyphenatedOrgID(t *testing.T) { +func TestProvisionAcceptsCanonicalHyphenatedUUIDOrgID(t *testing.T) { store := newFakeStore() router := newTestRouter(store) body := []byte(`{"database_name":"d","metadata_store":{"type":"cnpg-shard"},"iceberg":{"enabled":true}}`) - req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/ben-iceberg-cnpg/provision", bytes.NewReader(body)) + req := httptest.NewRequest(http.MethodPost, "/api/v1/orgs/"+testOrgID+"/provision", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") rec := httptest.NewRecorder() router.ServeHTTP(rec, req) if rec.Code != http.StatusAccepted { - t.Fatalf("hyphenated org id should be accepted, got %d: %s", rec.Code, rec.Body.String()) + t.Fatalf("canonical hyphenated UUID org id should be accepted, got %d: %s", rec.Code, rec.Body.String()) } } diff --git a/docs/runbooks/managed-warehouse-deprovision.md b/docs/runbooks/managed-warehouse-deprovision.md index 3857ddb4..31c83e52 100644 --- a/docs/runbooks/managed-warehouse-deprovision.md +++ b/docs/runbooks/managed-warehouse-deprovision.md @@ -119,7 +119,7 @@ kubectl --context "$TARGET_CONTEXT" get managed -A | grep "$CR_NAME" || true ## Legacy Hyphenated CRs -Older Duckling CRs may have hyphens in `metadata.name`; current controller code strips hyphens for newer CRs to keep AWS resource names under length limits. If only the hyphenated CR exists, normal deprovisioning may miss it. For urgent cleanup, snapshot and delete the exact existing CR: +UUID-shaped Duckling CRs may render compact dehyphenated bucket names so composed S3 buckets fit AWS length limits. Manual cleanup should still snapshot and delete the exact existing Duckling CR: ```bash kubectl --context "$TARGET_CONTEXT" -n ducklings get ducklings.k8s.posthog.com "$CR_NAME" -o yaml > "/tmp/$ORG-duckling.yaml"