Skip to content

Commit 6a1efff

Browse files
committed
Fix: 'column XMIN does not exist' exception;fix: live queries not honouring query parameters; improve live queris' dedupe boundary to: SQL text + query params + transaction context; chore: internal refactors and other fixes
1 parent c7a2f5c commit 6a1efff

39 files changed

Lines changed: 755 additions & 702 deletions

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ It lets you declare remote data as local tables (views):
361361
await db.query(`
362362
CREATE VIEW users AS
363363
SELECT * FROM users
364-
WITH (replication_origin = '/api/db');
364+
WITH (replication_origin = 'postgres:/api/db');
365365
`);
366366
```
367367

@@ -383,7 +383,7 @@ With just an extra keyword, you get **automatic sync between local and remote st
383383
await db.query(`
384384
CREATE REALTIME VIEW users AS
385385
SELECT * FROM users
386-
WITH (replication_origin = '/api/db');
386+
WITH (replication_origin = 'postgres:/api/db');
387387
`);
388388
```
389389

Lines changed: 27 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,23 @@
11
import { LinkedQLClient } from './LinkedQLClient.js';
22
import { RealtimeClient } from '../../proc/realtime/RealtimeClient.js';
33
import { MainstreamSchemaInference } from './MainstreamSchemaInference.js';
4-
import { MainstreamWalEngine } from './MainstreamWalEngine.js';
54
import { SQLParser } from '../../lang/SQLParser.js';
65
import { registry } from '../../lang/registry.js';
76
import { normalizeQueryArgs } from './util.js';
87
import { Result } from '../Result.js';
98

10-
export class MainstreamDBClient extends LinkedQLClient {
9+
export class MainstreamClient extends LinkedQLClient {
1110

12-
// Standard getters: parsers, resolver, wal
11+
// Standard getters: parsers, resolver
1312

1413
#parser;
15-
#wal;
1614
#live;
1715

1816
get parser() { return this.#parser; }
1917
get resolver() {
2018
return super.resolveGetResolver(() =>
21-
new MainstreamSchemaInference({ client: this, dialect: this.dialect }));
19+
new MainstreamSchemaInference({ mainstreamClient: this, dialect: this.dialect }));
2220
}
23-
get wal() { return this.#wal; }
2421
get live() { return this.#live; }
2522

2623
// Internal
@@ -33,64 +30,54 @@ export class MainstreamDBClient extends LinkedQLClient {
3330
super(options);
3431

3532
this.#parser = new SQLParser({ dialect: this.dialect });
36-
this.#wal = new MainstreamWalEngine({
37-
client: this,
38-
drainMode: 'drain',
39-
lifecycleHook: async (status) => {
40-
await this.setCapability({ realtime: !!status });
41-
}
42-
});
4333

4434
this.#realtimeClient = new RealtimeClient(this);
4535
this.#live = {
4636
forget: async (id) => await this.#realtimeClient.forget(id),
4737
};
4838
}
4939

50-
async disconnect() {
51-
await this.#wal.close({ destroy: true });
52-
await super.disconnect();
53-
}
54-
5540
// ------------
5641

57-
async transaction(cb) {
42+
async begin(options = {}) {
43+
if (options.parentTx) {
44+
throw new Error(`Nested transactions are not supported on mainstream databasew for now`);
45+
}
46+
return await this._begin(options);
47+
}
48+
49+
async transaction(cb, options = {}) {
5850
if (typeof cb !== 'function') {
5951
throw new TypeError('transaction(cb): cb must be a function');
6052
}
61-
if (typeof this._beginTransaction !== 'function'
62-
|| typeof this._commitTransaction !== 'function'
63-
|| typeof this._rollbackTransaction !== 'function') {
64-
throw new Error('Transaction not supported by this client implementation');
65-
}
66-
67-
const tx = await this._beginTransaction();
53+
54+
const tx = await this.begin(options);
6855

6956
try {
7057
const result = await cb(tx);
71-
await this._commitTransaction(tx);
58+
await tx.commit();
7259
return result;
7360
} catch (e) {
74-
await this._rollbackTransaction(tx);
61+
await tx.rollback();
7562
throw e;
7663
}
7764
}
7865

7966
// ------------
8067

8168
async query(...args) {
82-
const [_query, options] = normalizeQueryArgs(...args);
69+
const [_query, { tx: inputTx, ...options }] = normalizeQueryArgs(...args);
8370
const query = await this.#parser.parse(_query, options);
8471

85-
const resolveQuery = async (query, tx = null, ifHasSugars = false) => {
72+
const resolveQuery = async (query, tx, ifHasSugars = false) => {
8673
const schemaInference = this.resolver;
8774
return await schemaInference.resolveQuery(query, { tx, ifHasSugars });
8875
};
8976

9077
// Realtime query?
9178
if (options.live) {
92-
const resolvedQuery = await resolveQuery(query);
93-
return await this.#realtimeClient.query(resolvedQuery, options);
79+
const resolvedQuery = await resolveQuery(query, inputTx);
80+
return await this.#realtimeClient.query(resolvedQuery, { ...options, tx: inputTx });
9481
}
9582

9683
let result;
@@ -108,22 +95,25 @@ export class MainstreamDBClient extends LinkedQLClient {
10895
{ ...options, tx }
10996
);
11097
}
111-
});
98+
}, { parentTx: inputTx });
11299
} else {
113-
result = await this._query(await resolveQuery(query, null, true), options);
100+
result = await this._query(
101+
await resolveQuery(query, inputTx, true),
102+
{ ...options, tx: inputTx }
103+
);
114104
}
115105

116106
// The result instance
117107
return new Result({ rows: result.rows, rowCount: result.rowCount });
118108
}
119109

120110
async stream(...args) {
121-
const [_query, options] = normalizeQueryArgs(...args);
111+
const [_query, { tx: inputTx, ...options }] = normalizeQueryArgs(...args);
122112
const query = await this.#parser.parse(_query, options);
123113

124114
const schemaInference = this.resolver;
125-
const resolvedQuery = await schemaInference.resolveQuery(query, { ...options, ifHasSugars: true });
115+
const resolvedQuery = await schemaInference.resolveQuery(query, { ...options, tx: inputTx, ifHasSugars: true });
126116

127-
return await this._stream(resolvedQuery, options);
117+
return await this._stream(resolvedQuery, { ...options, tx: inputTx });
128118
}
129119
}

src/clients/abstracts/MainstreamSchemaInference.js

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
1-
import { SchemaInference as BaseSchemaInference } from '../../lang/SchemaInference.js';
1+
import { SchemaInference } from '../../lang/SchemaInference.js';
22
import { normalizeRelationSelectorArg, parseRelationSelectors } from './util.js';
33
import { registry } from '../../lang/registry.js';
44

5-
export class MainstreamSchemaInference extends BaseSchemaInference {
5+
export class MainstreamSchemaInference extends SchemaInference {
66

7-
#client;
8-
get client() { return this.#client; }
7+
#mainstreamClient;
98

10-
constructor({ client, ...options }) {
9+
constructor({ mainstreamClient, ...options }) {
1110
super(options);
12-
this.#client = client;
11+
this.#mainstreamClient = mainstreamClient;
1312
}
1413

1514
async showCreate(selector, { structured = false, tx = null } = {}) {
1615
selector = normalizeRelationSelectorArg(selector);
1716
const sql = this.#composeShowCreateSQL(selector);
18-
const result = await this.#client._query(sql, { tx }); // Must be _query() not query()
17+
const result = await this.#mainstreamClient._query(sql, { tx }); // Must be _query() not query()
1918
return await this.#formatShowCreateResult(result.rows, structured);
2019
}
2120

src/clients/abstracts/MainstreamWalEngine.js renamed to src/clients/abstracts/MainstreamWal.js

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
1-
import { WalEngine as BaseWalEngine } from '../../proc/timeline/WalEngine.js';
1+
import { LinkedQlWal } from '../../proc/timeline/LinkedQlWal.js';
22
import { ConflictError } from '../../flashql/errors/ConflictError.js';
33

4-
export class MainstreamWalEngine extends BaseWalEngine {
4+
export class MainstreamWal extends LinkedQlWal {
55

6-
#client;
7-
get client() { return this.#client; }
6+
#mainstreamClient;
87

9-
constructor({ client, ...options }) {
10-
super(options);
11-
this.#client = client;
8+
constructor({ mainstreamClient, ...options }) {
9+
super({
10+
...options,
11+
drainMode: 'drain',
12+
lifecycleHook: async (status) => {
13+
if (status) {
14+
await this._setupRealtime();
15+
} else {
16+
await this._teardownRealtime();
17+
}
18+
}
19+
});
20+
this.#mainstreamClient = mainstreamClient;
1221
}
1322

1423
_quoteIdent(name) {
@@ -42,8 +51,8 @@ export class MainstreamWalEngine extends BaseWalEngine {
4251
if (mvccKey) {
4352
if (!event.mvccTag)
4453
throw new TypeError(`Missing event.mvccTag for the specified mvccKey ${mvccKey}`);
45-
const mvccExpr = this.#client.dialect === 'postgres' && mvccKey.toUpperCase() === 'XMIN'
46-
? 'CAST(CAST(xmin AS TEXT) AS BIGINT)'
54+
const mvccExpr = this.#mainstreamClient.dialect === 'postgres' && mvccKey.toUpperCase() === 'XMIN'
55+
? 'CAST(xmin AS TEXT)'
4756
: this._quoteIdent(mvccKey);
4857
sql += ` AND ${mvccExpr} = ${this._serializeValue(event.mvccTag)}`;
4958
}
@@ -82,15 +91,15 @@ export class MainstreamWalEngine extends BaseWalEngine {
8291

8392
if (!sql) continue;
8493

85-
const result = await this.#client._query(sql, { tx });
94+
const result = await this.#mainstreamClient._query(sql, { tx });
8695
if ((op === 'update' || op === 'delete') && result?.rowCount === 0) {
8796
throw new ConflictError(`[${this._quoteQualifiedRelation(relation)}] Origin row version no longer matches the expected version`);
8897
}
8998
}
9099
};
91100

92-
if (typeof this.#client.transaction === 'function') {
93-
return await this.#client.transaction(async (tx) => {
101+
if (typeof this.#mainstreamClient.transaction === 'function') {
102+
return await this.#mainstreamClient.transaction(async (tx) => {
94103
await applyCommit(tx);
95104
});
96105
}

0 commit comments

Comments
 (0)