Skip to content

Commit 4e0f475

Browse files
committed
feat: postgres database resource
Adds a new `PostgresDatabaseResource` that shifts the Postgres database creation logic to happen later in the database creation process. Now, the `InstanceResource` is only responsible for creating the database users, and the `PostgresDatabase` resource is responsible for creating the Postgres database, granting role privileges to the new database, and initializing the Spock node. This change is necessary because we need to use `pg_service.conf` for SystemD support, and we need that file to exist when we create the Spock node. This change shifts the Spock node initialization until after all of the instances are created and all nodes are ready, so we're able to construct the DSNs for all nodes before we initialize Spock. This change has an added benefit that it opens up the possibility of support for multiple Postgres databases per Control Plane database since we're able to make multiple databases/spock nodes per `NodeResource`/Patroni cluster. I've added the `DatabaseName` field to most, if not all, of the resources that would need to change to accommodate this. However, I did stop short of incorporating the database name into these resource's identities, which will be necessary to complete that multi-database support. PLAT-417
1 parent 53593e9 commit 4e0f475

66 files changed

Lines changed: 1629 additions & 633 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
{
22
"profiling_enabled": true,
3-
"client_addresses": ["127.0.0.1"]
3+
"client_addresses": ["127.0.0.1"],
4+
"logging": {
5+
"component_levels": {
6+
"api_server": "error"
7+
}
8+
}
49
}

server/internal/api/apiv1/validate.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ func validateDatabaseSpec(spec *api.DatabaseSpec) error {
6767
errs = append(errs, validateCPUs(spec.Cpus, []string{"cpus"})...)
6868
errs = append(errs, validateMemory(spec.Memory, []string{"memory"})...)
6969
errs = append(errs, validatePorts(spec.Port, spec.PatroniPort, []string{"port"}))
70+
errs = append(errs, validateUsers(spec.DatabaseUsers, []string{"database_users"})...)
7071

7172
// Track node-name uniqueness and prepare set for cross-node checks.
7273
seenNodeNames := make(ds.Set[string], len(spec.Nodes))
@@ -344,6 +345,33 @@ func validatePorts(postgresPort, patroniPort *int, path []string) error {
344345
return nil
345346
}
346347

348+
func validateUsers(users []*api.DatabaseUserSpec, path []string) []error {
349+
var errs []error
350+
351+
seenNames := ds.NewSet[string]()
352+
var hasOwner bool
353+
for i, user := range users {
354+
userPath := appendPath(path, arrayIndexPath(i))
355+
356+
if seenNames.Has(user.Username) {
357+
err := errors.New("usernames must be unique within a database")
358+
errs = append(errs, newValidationError(err, userPath))
359+
}
360+
if hasOwner {
361+
err := errors.New("cannot have multiple users with db_owner = true")
362+
errs = append(errs, newValidationError(err, userPath))
363+
}
364+
365+
seenNames.Add(user.Username)
366+
367+
if user.DbOwner != nil && *user.DbOwner {
368+
hasOwner = true
369+
}
370+
}
371+
372+
return errs
373+
}
374+
347375
func validateBackupConfig(cfg *api.BackupConfigSpec, path []string) []error {
348376
var errs []error
349377

server/internal/api/apiv1/validate_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -741,6 +741,39 @@ func TestValidateDatabaseSpec(t *testing.T) {
741741
"nodes[1].port: postgres and patroni ports must not conflict",
742742
},
743743
},
744+
{
745+
name: "invalid users",
746+
spec: &api.DatabaseSpec{
747+
DatabaseUsers: []*api.DatabaseUserSpec{
748+
{
749+
Username: "duplicate_user",
750+
},
751+
{
752+
Username: "duplicate_user",
753+
},
754+
{
755+
Username: "duplicate_owner_1",
756+
DbOwner: utils.PointerTo(true),
757+
},
758+
{
759+
Username: "duplicate_owner_2",
760+
DbOwner: utils.PointerTo(true),
761+
},
762+
},
763+
Nodes: []*api.DatabaseNodeSpec{
764+
{
765+
Name: "n1",
766+
HostIds: []api.Identifier{
767+
api.Identifier("host-1"),
768+
},
769+
},
770+
},
771+
},
772+
expected: []string{
773+
"users[1]: usernames must be unique within a database",
774+
"users[3]: cannot have multiple users with db_owner = true",
775+
},
776+
},
744777
} {
745778
t.Run(tc.name, func(t *testing.T) {
746779
err := validateDatabaseSpec(tc.spec)

server/internal/database/instance_resource.go

Lines changed: 17 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package database
22

33
import (
44
"context"
5-
"crypto/tls"
65
"errors"
76
"fmt"
87
"slices"
@@ -16,7 +15,6 @@ import (
1615
"github.com/pgEdge/control-plane/server/internal/patroni"
1716
"github.com/pgEdge/control-plane/server/internal/postgres"
1817
"github.com/pgEdge/control-plane/server/internal/resource"
19-
"github.com/pgEdge/control-plane/server/internal/utils"
2018
)
2119

2220
var _ resource.Resource = (*InstanceResource)(nil)
@@ -122,6 +120,10 @@ func (r *InstanceResource) Delete(ctx context.Context, rc *resource.Context) err
122120
}
123121

124122
func (r *InstanceResource) Connection(ctx context.Context, rc *resource.Context, dbName string) (*pgx.Conn, error) {
123+
if rc.HostID != r.Spec.HostID {
124+
return nil, fmt.Errorf("cannot connect to an instance running on a different host. executing host = '%s', instance host = '%s'", rc.HostID, r.Spec.HostID)
125+
}
126+
125127
certs, err := do.Invoke[*certificates.Service](rc.Injector)
126128
if err != nil {
127129
return nil, err
@@ -143,17 +145,17 @@ func (r *InstanceResource) Connection(ctx context.Context, rc *resource.Context,
143145
}
144146

145147
func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.Context) error {
146-
certs, err := do.Invoke[*certificates.Service](rc.Injector)
147-
if err != nil {
148-
return err
149-
}
148+
// certs, err := do.Invoke[*certificates.Service](rc.Injector)
149+
// if err != nil {
150+
// return err
151+
// }
150152

151153
if err := r.updateConnectionInfo(ctx, rc); err != nil {
152154
return err
153155
}
154156

155157
patroniClient := r.patroniClient()
156-
err = WaitForPatroniRunning(ctx, patroniClient, 0)
158+
err := WaitForPatroniRunning(ctx, patroniClient, 0)
157159
if err != nil {
158160
return fmt.Errorf("failed to wait for patroni to enter running state: %w", err)
159161
}
@@ -173,98 +175,41 @@ func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.
173175
return nil
174176
}
175177

176-
tlsCfg, err := certs.PostgresUserTLS(ctx, r.Spec.InstanceID, r.InstanceHostname, "pgedge")
177-
if err != nil {
178-
return fmt.Errorf("failed to get TLS config: %w", err)
179-
}
180-
181-
var spockSets []postgres.ReplicationSet
182-
var spockTables []postgres.ReplicationSetTable
183-
if r.Spec.RestoreConfig != nil && r.isFirstTimeSetup(rc) {
184-
err = r.renameDB(ctx, tlsCfg)
185-
if err != nil {
186-
return fmt.Errorf("failed to rename database %q: %w", r.Spec.DatabaseName, err)
187-
}
188-
189-
spockSets, spockTables, err = r.backupReplicationSets(ctx, tlsCfg)
190-
if err != nil {
191-
return err
192-
}
193-
194-
err = r.dropSpock(ctx, tlsCfg)
195-
if err != nil {
196-
return fmt.Errorf("failed to drop spock: %w", err)
197-
}
198-
}
199-
200-
err = r.createDB(ctx, tlsCfg)
178+
conn, err := r.Connection(ctx, rc, "postgres")
201179
if err != nil {
202-
return fmt.Errorf("failed to create database %q: %w", r.Spec.DatabaseName, err)
203-
}
204-
205-
conn, err := ConnectToInstance(ctx, &ConnectionOptions{
206-
DSN: r.ConnectionInfo.AdminDSN(r.Spec.DatabaseName),
207-
TLS: tlsCfg,
208-
})
209-
if err != nil {
210-
return fmt.Errorf("failed to connect to database %q: %w", r.Spec.DatabaseName, err)
180+
return err
211181
}
212182
defer conn.Close(ctx)
213183

214-
tx, err := conn.Begin(ctx)
184+
// Spock shouldn't exist in the 'postgres' database, but we want to err on
185+
// the side of caution.
186+
tx, err := postgres.StartRepairModeTxn(ctx, conn)
215187
if err != nil {
216-
return fmt.Errorf("failed to begin transaction: %w", err)
188+
return fmt.Errorf("failed to start repair mode transaction: %w", err)
217189
}
218190
defer tx.Rollback(ctx)
219191

220-
enabled, err := postgres.IsSpockEnabled().Scalar(ctx, tx)
221-
if err != nil {
222-
return fmt.Errorf("failed to check if spock is enabled: %w", err)
223-
}
224-
225-
if enabled {
226-
err = postgres.EnableRepairMode().Exec(ctx, tx)
227-
if err != nil {
228-
return fmt.Errorf("failed to enable repair mode: %w", err)
229-
}
230-
}
231-
232-
err = postgres.InitializePgEdgeExtensions(
233-
r.Spec.NodeName,
234-
r.ConnectionInfo.PeerDSN(r.Spec.DatabaseName),
235-
).Exec(ctx, conn)
236-
if err != nil {
237-
return fmt.Errorf("failed to initialize pgedge extensions: %w", err)
238-
}
239-
if len(spockSets) > 0 || len(spockTables) > 0 {
240-
if err := postgres.RestoreReplicationSets(spockSets, spockTables).Exec(ctx, conn); err != nil {
241-
return fmt.Errorf("failed to restore spock metadata: %w", err)
242-
}
243-
}
244192
roleStatements, err := postgres.CreateBuiltInRoles(postgres.BuiltinRoleOptions{
245193
PGVersion: r.Spec.PgEdgeVersion.PostgresVersion.String(),
246-
DBName: r.Spec.DatabaseName,
247194
})
248195
if err != nil {
249196
return fmt.Errorf("failed to generate built-in role statements: %w", err)
250197
}
251-
if err := roleStatements.Exec(ctx, conn); err != nil {
198+
if err := roleStatements.Exec(ctx, tx); err != nil {
252199
return fmt.Errorf("failed to create built-in roles: %w", err)
253200
}
254201

255202
for _, user := range r.Spec.DatabaseUsers {
256203
statement, err := postgres.CreateUserRole(postgres.UserRoleOptions{
257204
Name: user.Username,
258205
Password: user.Password,
259-
DBName: r.Spec.DatabaseName,
260-
DBOwner: user.DBOwner,
261206
Attributes: user.Attributes,
262207
Roles: user.Roles,
263208
})
264209
if err != nil {
265210
return fmt.Errorf("failed to produce create user role statement %q: %w", user.Username, err)
266211
}
267-
if err := statement.Exec(ctx, conn); err != nil {
212+
if err := statement.Exec(ctx, tx); err != nil {
268213
return fmt.Errorf("failed to create user role %q: %w", user.Username, err)
269214
}
270215
}
@@ -332,103 +277,3 @@ func (r *InstanceResource) updateConnectionInfo(ctx context.Context, rc *resourc
332277
func (r *InstanceResource) patroniClient() *patroni.Client {
333278
return patroni.NewClient(r.ConnectionInfo.PatroniURL(), nil)
334279
}
335-
336-
func (r *InstanceResource) createDB(ctx context.Context, tlsCfg *tls.Config) error {
337-
createDBConn, err := ConnectToInstance(ctx, &ConnectionOptions{
338-
DSN: r.ConnectionInfo.AdminDSN("postgres"),
339-
TLS: tlsCfg,
340-
})
341-
if err != nil {
342-
return fmt.Errorf("failed to connect to 'postgres' database on instance: %w", err)
343-
}
344-
defer createDBConn.Close(ctx)
345-
346-
err = postgres.CreateDatabase(r.Spec.DatabaseName).Exec(ctx, createDBConn)
347-
if err != nil {
348-
return fmt.Errorf("failed to create database %q: %w", r.Spec.DatabaseName, err)
349-
}
350-
351-
return nil
352-
}
353-
354-
func (r *InstanceResource) renameDB(ctx context.Context, tlsCfg *tls.Config) error {
355-
// Short circuit if the restore config doesn't include a dbname or if the
356-
// database name is the same.
357-
if r.Spec.RestoreConfig.SourceDatabaseName == "" || r.Spec.RestoreConfig.SourceDatabaseName == r.Spec.DatabaseName {
358-
return nil
359-
}
360-
361-
// This operation can be flaky because of other processes connected to the
362-
// database. We retry it a few times to avoid failing the entire create
363-
// operation.
364-
err := utils.Retry(3, 500*time.Millisecond, func() error {
365-
createDBConn, err := ConnectToInstance(ctx, &ConnectionOptions{
366-
DSN: r.ConnectionInfo.AdminDSN("postgres"),
367-
TLS: tlsCfg,
368-
})
369-
if err != nil {
370-
return fmt.Errorf("failed to connect to 'postgres' database on instance: %w", err)
371-
}
372-
defer createDBConn.Close(ctx)
373-
374-
return postgres.
375-
RenameDB(r.Spec.RestoreConfig.SourceDatabaseName, r.Spec.DatabaseName).
376-
Exec(ctx, createDBConn)
377-
})
378-
if err != nil {
379-
return fmt.Errorf("failed to rename database %q: %w", r.Spec.DatabaseName, err)
380-
}
381-
382-
return nil
383-
}
384-
385-
func (r *InstanceResource) dropSpock(ctx context.Context, tlsCfg *tls.Config) error {
386-
conn, err := ConnectToInstance(ctx, &ConnectionOptions{
387-
DSN: r.ConnectionInfo.AdminDSN(r.Spec.DatabaseName),
388-
TLS: tlsCfg,
389-
})
390-
if err != nil {
391-
return fmt.Errorf("failed to connect to database %q: %w", r.Spec.DatabaseName, err)
392-
}
393-
defer conn.Close(ctx)
394-
395-
err = postgres.DropSpockAndCleanupSlots(r.Spec.DatabaseName).Exec(ctx, conn)
396-
if err != nil {
397-
return fmt.Errorf("failed to drop spock: %w", err)
398-
}
399-
400-
return nil
401-
}
402-
403-
func (r *InstanceResource) isFirstTimeSetup(rc *resource.Context) bool {
404-
// This instance will already exist in the state if it's been successfully
405-
// created before.
406-
_, ok := rc.State.Get(r.Identifier())
407-
return !ok
408-
}
409-
410-
func (r *InstanceResource) backupReplicationSets(
411-
ctx context.Context,
412-
tlsCfg *tls.Config,
413-
) ([]postgres.ReplicationSet, []postgres.ReplicationSetTable, error) {
414-
conn, err := ConnectToInstance(ctx, &ConnectionOptions{
415-
DSN: r.ConnectionInfo.AdminDSN(r.Spec.DatabaseName),
416-
TLS: tlsCfg,
417-
})
418-
if err != nil {
419-
return nil, nil, fmt.Errorf("failed to connect to database %q: %w", r.Spec.DatabaseName, err)
420-
}
421-
defer conn.Close(ctx)
422-
423-
sets, err := postgres.GetReplicationSets().Structs(ctx, conn)
424-
if err != nil {
425-
return nil, nil, fmt.Errorf("spock backup failed to get replication sets: %w", err)
426-
}
427-
428-
tabs, err := postgres.GetReplicationSetTables().Structs(ctx, conn)
429-
if err != nil {
430-
return nil, nil, fmt.Errorf("spock backup failed to get replication set tables: %w", err)
431-
}
432-
433-
return sets, tabs, nil
434-
}

server/internal/database/lag_tracker_commit_ts_resource.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type LagTrackerCommitTimestampResource struct {
2626
// Planner fields
2727
OriginNode string `json:"origin_node"`
2828
ReceiverNode string `json:"receiver_node"`
29+
DatabaseName string `json:"database_name"`
2930

3031
// Dependency wiring
3132
ExtraDependencies []resource.Identifier `json:"dependent_resources,omitempty"`
@@ -56,7 +57,8 @@ func (r *LagTrackerCommitTimestampResource) Identifier() resource.Identifier {
5657

5758
func (r *LagTrackerCommitTimestampResource) Dependencies() []resource.Identifier {
5859
deps := []resource.Identifier{
59-
NodeResourceIdentifier(r.ReceiverNode),
60+
PostgresDatabaseResourceIdentifier(r.ReceiverNode, r.DatabaseName),
61+
PostgresDatabaseResourceIdentifier(r.OriginNode, r.DatabaseName),
6062
}
6163
deps = append(deps, r.ExtraDependencies...)
6264
return deps

0 commit comments

Comments
 (0)