Skip to content

Commit daeb238

Browse files
committed
feat: postgres database resource
Adds a new `PostgresDatabaseResource` that shifts the Postgres database creation logic to happen later in the database creation process. Now, the `InstanceResource` is only responsible for creating the database users, and the `PostgresDatabase` resource is responsible for creating the Postgres database, granting role privileges to the new database, and initializing the Spock node. This change is necessary because we need to use `pg_service.conf` for SystemD support, and we need that file to exist when we create the Spock node. This change shifts the Spock node initialization until after all of the instances are created and all nodes are ready, so we're able to construct the DSNs for all nodes before we initialize Spock. This change has an added benefit that it opens up the possibility of support for multiple Postgres databases per Control Plane database since we're able to make multiple databases/spock nodes per `NodeResource`/Patroni cluster. I've added the `DatabaseName` field to most, if not all, of the resources that would need to change to accommodate this. However, I did stop short of incorporating the database name into these resource's identities, which will be necessary to complete that multi-database support. PLAT-417
1 parent 9df990e commit daeb238

64 files changed

Lines changed: 1542 additions & 630 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
{
22
"profiling_enabled": true,
3-
"client_addresses": ["127.0.0.1"]
3+
"client_addresses": ["127.0.0.1"],
4+
"logging": {
5+
"component_levels": {
6+
"api_server": "error"
7+
}
8+
}
49
}

server/internal/database/instance_resource.go

Lines changed: 5 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package database
22

33
import (
44
"context"
5-
"crypto/tls"
65
"errors"
76
"fmt"
87
"slices"
@@ -16,7 +15,6 @@ import (
1615
"github.com/pgEdge/control-plane/server/internal/patroni"
1716
"github.com/pgEdge/control-plane/server/internal/postgres"
1817
"github.com/pgEdge/control-plane/server/internal/resource"
19-
"github.com/pgEdge/control-plane/server/internal/utils"
2018
)
2119

2220
var _ resource.Resource = (*InstanceResource)(nil)
@@ -119,6 +117,10 @@ func (r *InstanceResource) Delete(ctx context.Context, rc *resource.Context) err
119117
}
120118

121119
func (r *InstanceResource) Connection(ctx context.Context, rc *resource.Context, dbName string) (*pgx.Conn, error) {
120+
if rc.HostID != r.Spec.HostID {
121+
return nil, fmt.Errorf("cannot connect to an instance running on a different host. executing host = '%s', instance host = '%s'", rc.HostID, r.Spec.HostID)
122+
}
123+
122124
certs, err := do.Invoke[*certificates.Service](rc.Injector)
123125
if err != nil {
124126
return nil, err
@@ -175,77 +177,17 @@ func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.
175177
return fmt.Errorf("failed to get TLS config: %w", err)
176178
}
177179

178-
firstTimeSetup, err := r.isFirstTimeSetup(rc)
179-
if err != nil {
180-
return err
181-
}
182-
183-
var spockSets []postgres.ReplicationSet
184-
var spockTables []postgres.ReplicationSetTable
185-
if r.Spec.RestoreConfig != nil && firstTimeSetup {
186-
err = r.renameDB(ctx, tlsCfg)
187-
if err != nil {
188-
return fmt.Errorf("failed to rename database %q: %w", r.Spec.DatabaseName, err)
189-
}
190-
191-
spockSets, spockTables, err = r.backupReplicationSets(ctx, tlsCfg)
192-
if err != nil {
193-
return err
194-
}
195-
196-
err = r.dropSpock(ctx, tlsCfg)
197-
if err != nil {
198-
return fmt.Errorf("failed to drop spock: %w", err)
199-
}
200-
}
201-
202-
err = r.createDB(ctx, tlsCfg)
203-
if err != nil {
204-
return fmt.Errorf("failed to create database %q: %w", r.Spec.DatabaseName, err)
205-
}
206-
207180
conn, err := ConnectToInstance(ctx, &ConnectionOptions{
208-
DSN: r.ConnectionInfo.AdminDSN(r.Spec.DatabaseName),
181+
DSN: r.ConnectionInfo.AdminDSN("postgres"),
209182
TLS: tlsCfg,
210183
})
211184
if err != nil {
212185
return fmt.Errorf("failed to connect to database %q: %w", r.Spec.DatabaseName, err)
213186
}
214187
defer conn.Close(ctx)
215188

216-
tx, err := conn.Begin(ctx)
217-
if err != nil {
218-
return fmt.Errorf("failed to begin transaction: %w", err)
219-
}
220-
defer tx.Rollback(ctx)
221-
222-
enabled, err := postgres.IsSpockEnabled().Scalar(ctx, tx)
223-
if err != nil {
224-
return fmt.Errorf("failed to check if spock is enabled: %w", err)
225-
}
226-
227-
if enabled {
228-
err = postgres.EnableRepairMode().Exec(ctx, tx)
229-
if err != nil {
230-
return fmt.Errorf("failed to enable repair mode: %w", err)
231-
}
232-
}
233-
234-
err = postgres.InitializePgEdgeExtensions(
235-
r.Spec.NodeName,
236-
r.ConnectionInfo.PeerDSN(r.Spec.DatabaseName),
237-
).Exec(ctx, conn)
238-
if err != nil {
239-
return fmt.Errorf("failed to initialize pgedge extensions: %w", err)
240-
}
241-
if len(spockSets) > 0 || len(spockTables) > 0 {
242-
if err := postgres.RestoreReplicationSets(spockSets, spockTables).Exec(ctx, conn); err != nil {
243-
return fmt.Errorf("failed to restore spock metadata: %w", err)
244-
}
245-
}
246189
roleStatements, err := postgres.CreateBuiltInRoles(postgres.BuiltinRoleOptions{
247190
PGVersion: r.Spec.PgEdgeVersion.PostgresVersion.String(),
248-
DBName: r.Spec.DatabaseName,
249191
})
250192
if err != nil {
251193
return fmt.Errorf("failed to generate built-in role statements: %w", err)
@@ -258,8 +200,6 @@ func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.
258200
statement, err := postgres.CreateUserRole(postgres.UserRoleOptions{
259201
Name: user.Username,
260202
Password: user.Password,
261-
DBName: r.Spec.DatabaseName,
262-
DBOwner: user.DBOwner,
263203
Attributes: user.Attributes,
264204
Roles: user.Roles,
265205
})
@@ -271,10 +211,6 @@ func (r *InstanceResource) initializeInstance(ctx context.Context, rc *resource.
271211
}
272212
}
273213

274-
if err := tx.Commit(ctx); err != nil {
275-
return fmt.Errorf("failed to commit transaction: %w", err)
276-
}
277-
278214
err = r.updateInstanceState(ctx, rc, &InstanceUpdateOptions{State: InstanceStateAvailable})
279215
if err != nil {
280216
return r.recordError(ctx, rc, err)
@@ -334,109 +270,3 @@ func (r *InstanceResource) updateConnectionInfo(ctx context.Context, rc *resourc
334270
func (r *InstanceResource) patroniClient() *patroni.Client {
335271
return patroni.NewClient(r.ConnectionInfo.PatroniURL(), nil)
336272
}
337-
338-
func (r *InstanceResource) createDB(ctx context.Context, tlsCfg *tls.Config) error {
339-
createDBConn, err := ConnectToInstance(ctx, &ConnectionOptions{
340-
DSN: r.ConnectionInfo.AdminDSN("postgres"),
341-
TLS: tlsCfg,
342-
})
343-
if err != nil {
344-
return fmt.Errorf("failed to connect to 'postgres' database on instance: %w", err)
345-
}
346-
defer createDBConn.Close(ctx)
347-
348-
err = postgres.CreateDatabase(r.Spec.DatabaseName).Exec(ctx, createDBConn)
349-
if err != nil {
350-
return fmt.Errorf("failed to create database %q: %w", r.Spec.DatabaseName, err)
351-
}
352-
353-
return nil
354-
}
355-
356-
func (r *InstanceResource) renameDB(ctx context.Context, tlsCfg *tls.Config) error {
357-
// Short circuit if the restore config doesn't include a dbname or if the
358-
// database name is the same.
359-
if r.Spec.RestoreConfig.SourceDatabaseName == "" || r.Spec.RestoreConfig.SourceDatabaseName == r.Spec.DatabaseName {
360-
return nil
361-
}
362-
363-
// This operation can be flaky because of other processes connected to the
364-
// database. We retry it a few times to avoid failing the entire create
365-
// operation.
366-
err := utils.Retry(3, 500*time.Millisecond, func() error {
367-
createDBConn, err := ConnectToInstance(ctx, &ConnectionOptions{
368-
DSN: r.ConnectionInfo.AdminDSN("postgres"),
369-
TLS: tlsCfg,
370-
})
371-
if err != nil {
372-
return fmt.Errorf("failed to connect to 'postgres' database on instance: %w", err)
373-
}
374-
defer createDBConn.Close(ctx)
375-
376-
return postgres.
377-
RenameDB(r.Spec.RestoreConfig.SourceDatabaseName, r.Spec.DatabaseName).
378-
Exec(ctx, createDBConn)
379-
})
380-
if err != nil {
381-
return fmt.Errorf("failed to rename database %q: %w", r.Spec.DatabaseName, err)
382-
}
383-
384-
return nil
385-
}
386-
387-
func (r *InstanceResource) dropSpock(ctx context.Context, tlsCfg *tls.Config) error {
388-
conn, err := ConnectToInstance(ctx, &ConnectionOptions{
389-
DSN: r.ConnectionInfo.AdminDSN(r.Spec.DatabaseName),
390-
TLS: tlsCfg,
391-
})
392-
if err != nil {
393-
return fmt.Errorf("failed to connect to database %q: %w", r.Spec.DatabaseName, err)
394-
}
395-
defer conn.Close(ctx)
396-
397-
err = postgres.DropSpockAndCleanupSlots(r.Spec.DatabaseName).Exec(ctx, conn)
398-
if err != nil {
399-
return fmt.Errorf("failed to drop spock: %w", err)
400-
}
401-
402-
return nil
403-
}
404-
405-
func (r *InstanceResource) isFirstTimeSetup(rc *resource.Context) (bool, error) {
406-
// This instance will already exist in the state if it's been successfully
407-
// created before.
408-
_, err := resource.FromContext[*InstanceResource](rc, r.Identifier())
409-
if errors.Is(err, resource.ErrNotFound) {
410-
return true, nil
411-
} else if err != nil {
412-
return false, fmt.Errorf("failed to check state for previous version of this instance: %w", err)
413-
}
414-
415-
return false, nil
416-
}
417-
418-
func (r *InstanceResource) backupReplicationSets(
419-
ctx context.Context,
420-
tlsCfg *tls.Config,
421-
) ([]postgres.ReplicationSet, []postgres.ReplicationSetTable, error) {
422-
conn, err := ConnectToInstance(ctx, &ConnectionOptions{
423-
DSN: r.ConnectionInfo.AdminDSN(r.Spec.DatabaseName),
424-
TLS: tlsCfg,
425-
})
426-
if err != nil {
427-
return nil, nil, fmt.Errorf("failed to connect to database %q: %w", r.Spec.DatabaseName, err)
428-
}
429-
defer conn.Close(ctx)
430-
431-
sets, err := postgres.GetReplicationSets().Structs(ctx, conn)
432-
if err != nil {
433-
return nil, nil, fmt.Errorf("spock backup failed to get replication sets: %w", err)
434-
}
435-
436-
tabs, err := postgres.GetReplicationSetTables().Structs(ctx, conn)
437-
if err != nil {
438-
return nil, nil, fmt.Errorf("spock backup failed to get replication set tables: %w", err)
439-
}
440-
441-
return sets, tabs, nil
442-
}

server/internal/database/lag_tracker_commit_ts_resource.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type LagTrackerCommitTimestampResource struct {
2626
// Planner fields
2727
OriginNode string `json:"origin_node"`
2828
ReceiverNode string `json:"receiver_node"`
29+
DatabaseName string `json:"database_name"`
2930

3031
// Dependency wiring
3132
ExtraDependencies []resource.Identifier `json:"dependent_resources,omitempty"`
@@ -56,7 +57,8 @@ func (r *LagTrackerCommitTimestampResource) Identifier() resource.Identifier {
5657

5758
func (r *LagTrackerCommitTimestampResource) Dependencies() []resource.Identifier {
5859
deps := []resource.Identifier{
59-
NodeResourceIdentifier(r.ReceiverNode),
60+
PostgresDatabaseResourceIdentifier(r.ReceiverNode, r.DatabaseName),
61+
PostgresDatabaseResourceIdentifier(r.OriginNode, r.DatabaseName),
6062
}
6163
deps = append(deps, r.ExtraDependencies...)
6264
return deps

server/internal/database/node_resource.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ package database
33
import (
44
"context"
55
"fmt"
6+
"slices"
7+
"strings"
68

9+
"github.com/pgEdge/control-plane/server/internal/postgres"
710
"github.com/pgEdge/control-plane/server/internal/resource"
811
)
912

@@ -83,3 +86,62 @@ func (n *NodeResource) Update(ctx context.Context, rc *resource.Context) error {
8386
func (n *NodeResource) Delete(ctx context.Context, rc *resource.Context) error {
8487
return nil
8588
}
89+
90+
func (n *NodeResource) DSN(
91+
ctx context.Context,
92+
rc *resource.Context,
93+
fromInstance *InstanceResource,
94+
dbName string,
95+
) (*postgres.DSN, error) {
96+
// Sort the instances so that our final DSN is deterministic
97+
instanceIDs := slices.Clone(n.InstanceIDs)
98+
slices.SortFunc(instanceIDs, func(a, b string) int {
99+
// Always sort the primary instance to the beginning of the list since
100+
// these DSNs are used for subscriptions, so we'll want to try
101+
// connecting to the known primary instance first.
102+
switch {
103+
case a == n.PrimaryInstanceID:
104+
return -1
105+
case b == n.PrimaryInstanceID:
106+
return 1
107+
default:
108+
return strings.Compare(a, b)
109+
}
110+
})
111+
112+
hosts := make([]string, len(instanceIDs))
113+
ports := make([]int, len(instanceIDs))
114+
for i, instanceID := range instanceIDs {
115+
instance, err := resource.FromContext[*InstanceResource](rc, InstanceResourceIdentifier(instanceID))
116+
if err != nil {
117+
return nil, fmt.Errorf("failed to get instance '%s': %w", instanceID, err)
118+
}
119+
hosts[i] = instance.ConnectionInfo.PeerHost
120+
ports[i] = instance.ConnectionInfo.PeerPort
121+
}
122+
123+
return &postgres.DSN{
124+
Hosts: hosts,
125+
Ports: ports,
126+
DBName: dbName,
127+
User: "pgedge",
128+
SSLCert: fromInstance.ConnectionInfo.PeerSSLCert,
129+
SSLKey: fromInstance.ConnectionInfo.PeerSSLKey,
130+
SSLRootCert: fromInstance.ConnectionInfo.PeerSSLRootCert,
131+
Extra: map[string]string{
132+
"target_session_attrs": "primary",
133+
},
134+
}, nil
135+
}
136+
137+
func (n *NodeResource) PrimaryInstance(ctx context.Context, rc *resource.Context) (*InstanceResource, error) {
138+
if n.PrimaryInstanceID == "" {
139+
return nil, fmt.Errorf("%w: primary instance id not set", resource.ErrNotFound)
140+
}
141+
instance, err := resource.FromContext[*InstanceResource](rc, InstanceResourceIdentifier(n.PrimaryInstanceID))
142+
if err != nil {
143+
return nil, fmt.Errorf("failed to get primary instance '%s': %w", n.PrimaryInstanceID, err)
144+
}
145+
146+
return instance, nil
147+
}

server/internal/database/operations/add_nodes.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package operations
33
import (
44
"fmt"
55

6-
"github.com/pgEdge/control-plane/server/internal/database"
76
"github.com/pgEdge/control-plane/server/internal/resource"
87
)
98

@@ -17,14 +16,12 @@ func AddNode(node *NodeResources) ([]*resource.State, error) {
1716
return nil, fmt.Errorf("got empty instances for node %s", node.NodeName)
1817
}
1918

20-
instanceIDs := make([]string, 0, len(node.InstanceResources))
2119
states := make([]*resource.State, 0, 2)
2220

2321
primary, err := instanceState(node.InstanceResources[0])
2422
if err != nil {
2523
return nil, err
2624
}
27-
instanceIDs = append(instanceIDs, node.InstanceResources[0].InstanceID())
2825
states = append(states, primary)
2926

3027
var replicas *resource.State
@@ -38,20 +35,17 @@ func AddNode(node *NodeResources) ([]*resource.State, error) {
3835
} else {
3936
replicas.Merge(replica)
4037
}
41-
instanceIDs = append(instanceIDs, inst.InstanceID())
4238
}
4339

4440
if replicas != nil {
4541
states = append(states, replicas)
4642
}
4743

48-
err = addNodeResource(states, &database.NodeResource{
49-
Name: node.NodeName,
50-
InstanceIDs: instanceIDs,
51-
})
44+
nodeState, err := node.nodeResourceState()
5245
if err != nil {
5346
return nil, err
5447
}
48+
states[len(states)-1].Merge(nodeState)
5549

5650
return states, nil
5751
}

0 commit comments

Comments
 (0)