Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ type Config struct {
GrpcListenAddress string `env:"GRPC_LISTEN_ADDRESS,default=0.0.0.0:8080"`
GrpcWebListenAddress string `env:"GRPC_WEB_LISTEN_ADDRESS,default=0.0.0.0:8081"`
SQLiteDirPath string `env:"SQLITE_DIR_PATH,default=db"`
PgDatabaseUrl string `env:"DATABASE_URL"`
PgRuntimeUrl string `env:"DATABASE_RUNTIME_URL"`
PgMigrationUrl string `env:"DATABASE_MIGRATION_URL"`
PgMaxConns int32 `env:"DATABASE_MAX_CONNS"`
CACert *Certificate `env:"CA_CERT"`
CRLUrl string `env:"CRL_URL"`
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP INDEX CONCURRENTLY IF EXISTS idx_records_user_revision;
1 change: 1 addition & 0 deletions store/postgres/migrations/3_records_revision_index.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_records_user_revision ON records (user_id, revision);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason to change this index? I'm asking because the query performance advisor doesn't flag this as a suggestion. It does suggest these for about 5% performance gain.

CREATE INDEX ON public.records USING btree (user_id);

and

CREATE INDEX ON public.user_revisions USING btree (user_id);

30 changes: 25 additions & 5 deletions store/postgres/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ type PgSyncStorage struct {
db *pgxpool.Pool
}

func NewPGSyncStorage(databaseURL string) (*PgSyncStorage, error) {
func NewPGSyncStorage(databaseURL, migrationURL string, maxConns int32) (*PgSyncStorage, error) {

db, err := sql.Open("pgx", databaseURL)
// migrationURL is expected to be a session-mode (or direct) URL, since
// golang-migrate uses pg_advisory_lock() which does not hold across a
// transaction-mode pooler. Callers fall back to databaseURL when no
// dedicated migration URL is configured.
db, err := sql.Open("pgx", migrationURL)
if err != nil {
return nil, fmt.Errorf("failed to open sqlite3 database %w", err)
return nil, fmt.Errorf("failed to open postgres database for migrations: %w", err)
}
defer db.Close()
driver, err := pgxmigrate.WithInstance(db, &pgxmigrate.Config{})
if err != nil {
return nil, fmt.Errorf("failed to create migration driver %w", err)
Expand All @@ -51,9 +56,24 @@ func NewPGSyncStorage(databaseURL string) (*PgSyncStorage, error) {
return nil, fmt.Errorf("failed to run migrations %w", err)
}

pgxPool, err := pgxpool.New(context.Background(), databaseURL)
poolConfig, err := pgxpool.ParseConfig(databaseURL)
if err != nil {
return nil, fmt.Errorf("pgxpool.New: %w", err)
return nil, fmt.Errorf("pgxpool.ParseConfig: %w", err)
}
// The pgx default (max(4, NumCPU)) is too small for this service in production,
// but we don't impose our own default here so local / dev runs keep pgx's behaviour.
// Set DATABASE_MAX_CONNS in the environment that needs it.
if maxConns > 0 {
poolConfig.MaxConns = maxConns
}
// We run behind Supavisor in transaction mode (port 6543) in production, where
// server-side prepared statements break because each tx can land on a different
// backend. cache_describe + no statement cache is safe in both modes.
poolConfig.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeCacheDescribe
poolConfig.ConnConfig.StatementCacheCapacity = 0
pgxPool, err := pgxpool.NewWithConfig(context.Background(), poolConfig)
if err != nil {
return nil, fmt.Errorf("pgxpool.NewWithConfig: %w", err)
}
return &PgSyncStorage{db: pgxPool}, nil
}
Expand Down
30 changes: 20 additions & 10 deletions store/postgres/pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,70 +9,80 @@ import (
)

func TestAddRecords(t *testing.T) {
storage, err := NewPGSyncStorage(os.Getenv("TEST_PG_DATABASE_URL"))
url := os.Getenv("TEST_PG_DATABASE_URL")
storage, err := NewPGSyncStorage(url, url, 0)
require.NoError(t, err, "failed to connect")

(&store.StoreTest{}).TestAddRecords(t, storage)
}

func TestUpdateRecords(t *testing.T) {
storage, err := NewPGSyncStorage(os.Getenv("TEST_PG_DATABASE_URL"))
url := os.Getenv("TEST_PG_DATABASE_URL")
storage, err := NewPGSyncStorage(url, url, 0)
require.NoError(t, err, "failed to connect")

(&store.StoreTest{}).TestUpdateRecords(t, storage)
}

func TestConflict(t *testing.T) {
storage, err := NewPGSyncStorage(os.Getenv("TEST_PG_DATABASE_URL"))
url := os.Getenv("TEST_PG_DATABASE_URL")
storage, err := NewPGSyncStorage(url, url, 0)
require.NoError(t, err, "failed to connect")

(&store.StoreTest{}).TestConflict(t, storage)
}

func TestAcquireAndCheckLock(t *testing.T) {
storage, err := NewPGSyncStorage(os.Getenv("TEST_PG_DATABASE_URL"))
url := os.Getenv("TEST_PG_DATABASE_URL")
storage, err := NewPGSyncStorage(url, url, 0)
require.NoError(t, err, "failed to connect")

(&store.StoreTest{}).TestAcquireAndCheckLock(t, storage)
}

func TestLockExpiration(t *testing.T) {
storage, err := NewPGSyncStorage(os.Getenv("TEST_PG_DATABASE_URL"))
url := os.Getenv("TEST_PG_DATABASE_URL")
storage, err := NewPGSyncStorage(url, url, 0)
require.NoError(t, err, "failed to connect")

(&store.StoreTest{}).TestLockExpiration(t, storage)
}

func TestMultipleInstanceLocks(t *testing.T) {
storage, err := NewPGSyncStorage(os.Getenv("TEST_PG_DATABASE_URL"))
url := os.Getenv("TEST_PG_DATABASE_URL")
storage, err := NewPGSyncStorage(url, url, 0)
require.NoError(t, err, "failed to connect")

(&store.StoreTest{}).TestMultipleInstanceLocks(t, storage)
}

func TestReleaseLockIdempotent(t *testing.T) {
storage, err := NewPGSyncStorage(os.Getenv("TEST_PG_DATABASE_URL"))
url := os.Getenv("TEST_PG_DATABASE_URL")
storage, err := NewPGSyncStorage(url, url, 0)
require.NoError(t, err, "failed to connect")

(&store.StoreTest{}).TestReleaseLockIdempotent(t, storage)
}

func TestExclusiveLock(t *testing.T) {
storage, err := NewPGSyncStorage(os.Getenv("TEST_PG_DATABASE_URL"))
url := os.Getenv("TEST_PG_DATABASE_URL")
storage, err := NewPGSyncStorage(url, url, 0)
require.NoError(t, err, "failed to connect")

(&store.StoreTest{}).TestExclusiveLock(t, storage)
}

func TestExclusiveLockSameInstance(t *testing.T) {
storage, err := NewPGSyncStorage(os.Getenv("TEST_PG_DATABASE_URL"))
url := os.Getenv("TEST_PG_DATABASE_URL")
storage, err := NewPGSyncStorage(url, url, 0)
require.NoError(t, err, "failed to connect")

(&store.StoreTest{}).TestExclusiveLockSameInstance(t, storage)
}

func TestDeleteExpiredLocks(t *testing.T) {
storage, err := NewPGSyncStorage(os.Getenv("TEST_PG_DATABASE_URL"))
url := os.Getenv("TEST_PG_DATABASE_URL")
storage, err := NewPGSyncStorage(url, url, 0)
require.NoError(t, err, "failed to connect")

(&store.StoreTest{}).TestDeleteExpiredLocks(t, storage)
Expand Down
14 changes: 12 additions & 2 deletions syncer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,19 @@ func NewPersistentSyncerServer(config *config.Config) (*PersistentSyncerServer,
var storage store.SyncStorage
var err error

if config.PgDatabaseUrl != "" {
if config.PgRuntimeUrl != "" {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we're the only users of data-sync, but do you think it maybe safer to fail fast if (the old) DATABASE_URL is set and DATABASE_RUNTIME_URL not set?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will change the semantic of which data source to use. I honestly think it is ok and it will allow us rollback fast if needed.

Copy link
Copy Markdown
Contributor

@dangeross dangeross Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is more for other users of it, on update if they don't change their env config they'll switch into sqlite storage

log.Printf("creating postgres storage\n")
storage, err = postgres.NewPGSyncStorage(config.PgDatabaseUrl)
// DATABASE_MIGRATION_URL is optional; when unset we run migrations over the
// runtime URL. Set it when the runtime URL points at a transaction-mode
// pooler (e.g. Supavisor :6543), since golang-migrate needs a session.
migrationURL := config.PgMigrationUrl
if migrationURL == "" {
log.Println("DATABASE_MIGRATION_URL not set; running migrations over DATABASE_RUNTIME_URL. " +
"If the runtime URL points at a transaction-mode pooler, migrations might fail — " +
"set DATABASE_MIGRATION_URL to a session-mode or direct URL.")
migrationURL = config.PgRuntimeUrl
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log a warning that the DATABASE_RUNTIME_URL will be used?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added log.

}
storage, err = postgres.NewPGSyncStorage(config.PgRuntimeUrl, migrationURL, config.PgMaxConns)
if err != nil {
return nil, err
}
Expand Down