Skip to content

Commit 3d5548f

Browse files
committed
Good working phase with bidi sync
1 parent feee156 commit 3d5548f

24 files changed

Lines changed: 1472 additions & 248 deletions
Lines changed: 68 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { WalEngine as BaseWalEngine } from '../../proc/timeline/WalEngine.js';
2+
import { ConflictError } from '../../flashql/errors/ConflictError.js';
23

34
export class MainstreamWalEngine extends BaseWalEngine {
45

@@ -10,52 +11,90 @@ export class MainstreamWalEngine extends BaseWalEngine {
1011
this.#client = client;
1112
}
1213

14+
_quoteIdent(name) {
15+
return `"${String(name).replace(/"/g, '""')}"`;
16+
}
17+
18+
_quoteQualifiedRelation({ namespace, name }) {
19+
return namespace
20+
? `${this._quoteIdent(namespace)}.${this._quoteIdent(name)}`
21+
: this._quoteIdent(name);
22+
}
23+
24+
_serializeValue(value) {
25+
if (value === null) return 'NULL';
26+
if (typeof value === 'number') {
27+
if (!Number.isFinite(value)) throw new TypeError('Cannot serialize non-finite number');
28+
return String(value);
29+
}
30+
if (typeof value === 'boolean') return value ? 'TRUE' : 'FALSE';
31+
if (typeof value === 'string') return `'${value.replace(/'/g, "''")}'`;
32+
if (value instanceof Date) return `'${value.toISOString().replace(/'/g, "''")}'`;
33+
return `'${JSON.stringify(value).replace(/'/g, "''")}'`;
34+
}
35+
1336
#buildOriginPredicate(event, mvccKey) {
1437
let sql = event.relation.keyColumns.map((col) => {
1538
if (!(col in event.old)) throw new TypeError(`Missing value for key field ${col}`);
1639
return `${this._quoteIdent(col)} = ${this._serializeValue(event.old[col])}`;
1740
}).join(' AND ');
1841

1942
if (mvccKey) {
20-
if (!event.mvccTag) throw new TypeError(`Missing event.mvccTag for the specified mvccKey ${mvccKey}`);
21-
sql += `${this._quoteIdent(mvccKey)} = ${this._serializeValue(event.mvccTag)}`;
43+
if (!event.mvccTag)
44+
throw new TypeError(`Missing event.mvccTag for the specified mvccKey ${mvccKey}`);
45+
const mvccExpr = this.#client.dialect === 'postgres' && mvccKey.toUpperCase() === 'XMIN'
46+
? `CAST(CAST(${this._quoteIdent(mvccKey)} AS TEXT) AS INT)`
47+
: this._quoteIdent(mvccKey);
48+
sql += ` AND ${mvccExpr} = ${this._serializeValue(event.mvccTag)}`;
2249
}
2350

2451
return sql;
2552
}
2653

27-
async handleDownstreamCommit(commit) {
28-
// Steps:
29-
// begin transaction
54+
async applyDownstreamCommit(commit) {
55+
const applyCommit = async (tx = null) => {
56+
for (const event of commit.entries) {
57+
const { op, relation } = event;
58+
let sql;
3059

31-
for (const event of commit.entries) {
32-
const { op, relation } = event;
33-
let sql;
60+
if (op === 'insert') {
61+
const entries = Object.entries(event.new);
62+
sql = `
63+
INSERT INTO ${this._quoteQualifiedRelation(relation)}
64+
(${entries.map(([name]) => this._quoteIdent(name)).join(', ')})
65+
VALUES (${entries.map(([, value]) => this._serializeValue(value)).join(', ')})`;
66+
}
3467

35-
if (op === 'insert') {
36-
const entries = Object.entries(event.new);
37-
sql = `
38-
INSERT INTO ${this._quoteQualifiedRelation(relation)}
39-
(${entries.map(([name]) => this._quoteIdent(name)).join(', ')})
40-
VALUES (${entries.map(([, value]) => this._serializeValue(value)).join(', ')})`;
41-
}
68+
if (op === 'update') {
69+
const assignments = Object.entries(event.new)
70+
.map(([name, value]) => `${this._quoteIdent(name)} = ${this._serializeValue(value)}`);
71+
sql = `
72+
UPDATE ${this._quoteQualifiedRelation(relation)}
73+
SET ${assignments.join(', ')}
74+
WHERE ${this.#buildOriginPredicate(event, relation.mvccKey)}`;
75+
}
4276

43-
if (op === 'update') {
44-
const assignments = Object.entries(event.new)
45-
.map(([name, value]) => `${this._quoteIdent(name)} = ${this._serializeValue(value)}`);
46-
sql = `
47-
UPDATE ${this._quoteQualifiedRelation(relation)}
48-
SET ${assignments.join(', ')}
49-
WHERE ${this.#buildOriginPredicate(event, commit.mvccKey)}`;
50-
}
77+
if (op === 'delete') {
78+
sql = `
79+
DELETE FROM ${this._quoteQualifiedRelation(relation)}
80+
WHERE ${this.#buildOriginPredicate(event, relation.mvccKey)}`;
81+
}
5182

52-
if (op === 'delete') {
53-
sql = `
54-
DELETE FROM ${this._quoteQualifiedRelation(relation)}
55-
WHERE ${this.#buildOriginPredicate(event, commit.mvccKey)}`;
83+
if (!sql) continue;
84+
85+
const result = await this.#client._query(sql, { tx });
86+
if ((op === 'update' || op === 'delete') && result?.rowCount === 0) {
87+
throw new ConflictError(`[${this._quoteQualifiedRelation(relation)}] Origin row version no longer matches the expected version`);
88+
}
5689
}
90+
};
5791

58-
if (sql) await this.#client.query(sql, { tx });
92+
if (typeof this.#client.transaction === 'function') {
93+
return await this.#client.transaction(async (tx) => {
94+
await applyCommit(tx);
95+
});
5996
}
97+
98+
return await applyCommit();
6099
}
61-
}
100+
}

src/clients/abstracts/SimpleEmitter.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ export class SimpleEmitter {
2626
const s = this.#listeners.get(event);
2727
if (!s) return;
2828
for (const fn of s) {
29-
fn(payload);
3029
try {
30+
fn(payload);
3131
} catch (err) {
3232
// eslint-disable-next-line no-console
3333
console.error('emitter handler error', err);

src/clients/edge/BaseEdgeClient.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ export class BaseEdgeClient extends LinkedQLClient {
234234
responseJson.port.addEventListener(`${this.#workerEventNamespace}commit`, handleCommit);
235235
gcArray.push(() => responseJson.port.removeEventListener(`${this.#workerEventNamespace}commit`, handleCommit));
236236

237-
const gc = async ({ forget = false }) => {
237+
const gc = async ({ forget = false } = {}) => {
238238
responseJson.port?.close();
239239

240240
if (forget && options.id) {

src/clients/edge/EdgeWalEngine.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ export class EdgeWalEngine extends BaseWalEngine {
2222
return await super.subscribe(...args);
2323
}
2424

25-
async handleDownstreamCommit(commit, options = {}) {
25+
async applyDownstreamCommit(commit, options = {}) {
2626
const procName = 'wal:handle_downstream_commit';
2727
return await this.#client._exec(procName, { commit, options });
2828
}
29-
}
29+
}

src/clients/edge/EdgeWorker.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ export class EdgeWorker extends SimpleEmitter {
222222
}
223223

224224
if (op === 'wal:handle_downstream_commit') {
225-
return await this.#db.wal.handleDownstreamCommit(args.commit, args.options);
225+
return await this.#db.wal.applyDownstreamCommit(args.commit, args.options);
226226
}
227227
}
228228

src/clients/postgres/PGClient.js

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,17 +66,37 @@ export class PGClient extends MainstreamDBClient {
6666
}
6767

6868
async _beginTransaction() {
69-
const conn = await this.connect();
69+
let conn;
70+
if (this.#poolMode) {
71+
conn = await this.#driver.connect();
72+
} else {
73+
if (!this.#adminDriver) {
74+
await this.connect();
75+
}
76+
conn = this.#driver;
77+
}
7078
await conn.query('BEGIN');
7179
return { conn };
7280
}
7381

7482
async _commitTransaction(tx) {
75-
await tx.conn.query('COMMIT');
83+
try {
84+
await tx.conn.query('COMMIT');
85+
} finally {
86+
if (this.#poolMode) {
87+
await tx.conn.release();
88+
}
89+
}
7690
}
7791

7892
async _rollbackTransaction(tx) {
79-
await tx.conn.query('ROLLBACK');
93+
try {
94+
await tx.conn.query('ROLLBACK');
95+
} finally {
96+
if (this.#poolMode) {
97+
await tx.conn.release();
98+
}
99+
}
80100
}
81101

82102
async _query(query, { values = [], prepared = null, tx = null }) {

src/flashql/eval/ExprEngine.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,13 @@ export class ExprEngine {
198198
// -----------
199199
const DT = dataType.value();
200200
switch (DT) {
201-
case 'INT': return parseInt(L);
201+
case 'SMALLINT':
202+
case 'INT':
203+
case 'INTEGER':
204+
case 'BIGINT':
205+
case 'SERIAL':
206+
case 'BIGSERIAL':
207+
return Number(L);
202208
case 'TEXT': return String(L);
203209
case 'BOOLEAN': return Boolean(L);
204210
default: return L;

src/flashql/eval/QueryEngine.js

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -346,10 +346,14 @@ export class QueryEngine {
346346
if (stmtNode.temporaryKW()) viewDef.persistence = 'temporary';
347347
if (stmtNode.replicationMode()) viewDef.replication_mode = stmtNode.replicationMode().toLowerCase();
348348
if (stmtNode.optionsClause()) {
349-
const viewOptions = this.#parser.optionsASTS_to_optionsDef(stmtNode.optionsClause().entries() || [], { prefix: '' });
350-
if (viewDef.replication_mode && viewOptions.replication_mode && viewOptions.replication_mode !== viewDef.replication_mode)
351-
throw new Error(`replication_mode=${viewOptions.replication_mode} cannot be used with a ${viewDef.replication_mode.toUpperCase()} view`);
352-
Object.assign(viewDef, viewOptions);
349+
const { replication_mode = viewDef.replication_mode, replication_origin, ...replication_opts } = this.#parser.optionsASTS_to_optionsDef(stmtNode.optionsClause().entries() || [], { prefix: '' });
350+
if (viewDef.replication_mode && replication_mode !== undefined && replication_mode !== viewDef.replication_mode)
351+
throw new Error(`replication_mode=${replication_mode} cannot be used with a ${viewDef.replication_mode.toUpperCase()} view`);
352+
Object.assign(viewDef, {
353+
replication_mode,
354+
replication_origin,
355+
replication_opts
356+
});
353357
}
354358

355359
if (stmtNode.orReplace()) {
@@ -375,9 +379,19 @@ export class QueryEngine {
375379
else if (action instanceof registry.RelationSourceExpr) {
376380
Object.assign(alterPayload, this.#parser.relationSourceExpr_to_relationSourceDef(action));
377381
} else if (action instanceof registry.OptionsSetClause) {
378-
Object.assign(alterPayload, this.#parser.optionsASTS_to_optionsDef(action.entries(), { prefix: '' }));
382+
const { replication_mode, replication_origin, ...replication_opts } = this.#parser.optionsASTS_to_optionsDef(action.entries(), { prefix: '' });
383+
Object.assign(alterPayload, {
384+
replication_mode,
385+
replication_origin,
386+
replication_opts
387+
});
379388
} else if (action instanceof registry.OptionsResetClause) {
380-
Object.assign(alterPayload, this.#parser.optionsASTS_to_optionsDef(action.entries(), { prefix: '', reset: true }));
389+
const { replication_mode, replication_origin, ...replication_opts } = this.#parser.optionsASTS_to_optionsDef(action.entries(), { prefix: '', reset: true });
390+
Object.assign(alterPayload, {
391+
replication_mode,
392+
replication_origin,
393+
replication_opts
394+
});
381395
} else {
382396
throw new Error(`Unsupported ALTER VIEW action ${action.NODE_NAME}`);
383397
}

src/flashql/storage/FlashWalEngine.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export class FlashWalEngine extends BaseWalEngine {
6666
return async () => await Promise.all(_gcArray.map((c) => c()));
6767
}
6868

69-
async handleDownstreamCommit(commit, { tx: inputTx = null } = {}) {
69+
async applyDownstreamCommit(commit, { tx: inputTx = null } = {}) {
7070

7171
const applyInTx = async (tx) => {
7272
for (const event of commit.entries) {

src/flashql/storage/StorageEngine.js

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { FlashSchemaInference } from './FlashSchemaInference.js';
66
import { FlashWalEngine } from './FlashWalEngine.js';
77
import { SyncManager } from '../sync/SyncManager.js';
88
import { MVCCEngine } from './MVCCEngine.js';
9+
import { SYSTEM_TAG } from './TableStorage.js';
910

1011
export class StorageEngine extends MVCCEngine {
1112

@@ -219,20 +220,44 @@ export class StorageEngine extends MVCCEngine {
219220
async #federateCommit(tx, timestamp) {
220221
const commitTime = this.txMeta(tx.id)?.commitTime;
221222

222-
const federateCommit = async (targetOrigin, entries, { queued }) => {
223-
const commit = { txId: tx.id, commitTime, entries, timestamp };
223+
const federateCommit = async (targetOrigin, entries, { queued = false } = {}) => {
224+
if (queued) {
225+
const outsyncQueue = tx.getRelation({ namespace: 'sys', name: 'sys_outsync_queue' });
226+
for (const changePayload of entries) {
227+
await outsyncQueue.insert({
228+
relation_id: changePayload.relation_id,
229+
origin: changePayload.origin,
230+
event_payload: changePayload.event,
231+
status: 'pending',
232+
retry_count: 0,
233+
last_error: null,
234+
created_at: timestamp,
235+
updated_at: timestamp,
236+
next_retry_at: null,
237+
}, { systemTag: SYSTEM_TAG });
238+
}
239+
return;
240+
}
241+
242+
const commit = {
243+
txId: tx.id,
244+
commitTime,
245+
entries: entries.map((changePayload) => changePayload.event),
246+
timestamp
247+
};
248+
224249
return targetOrigin
225-
? (await this.getUpstreamClient(targetOrigin)).wal.handleDownstreamCommit(commit)
226-
: await this.wal.handleDownstreamCommit(commit, { tx });
250+
? (await this.getUpstreamClient(targetOrigin)).wal.applyDownstreamCommit(commit)
251+
: await this.wal.applyDownstreamCommit(commit, { tx });
227252
};
228253

229-
for (const [targetOrigin, entries] of tx._upstreamLog) {
230-
await federateCommit(targetOrigin, entries, { queued: false });
231-
}
232-
233254
for (const [targetOrigin, entries] of tx._upstreamQueue) {
234255
await federateCommit(targetOrigin, entries, { queued: true });
235256
}
257+
258+
for (const [targetOrigin, entries] of tx._upstreamLog) {
259+
await federateCommit(targetOrigin, entries);
260+
}
236261
}
237262

238263
async #persistCommit(tx, timestamp) {
@@ -371,15 +396,19 @@ export class StorageEngine extends MVCCEngine {
371396
}
372397

373398
async commit(tx) {
374-
const returnValue = await super.commit(tx);
375-
399+
let timestamp;
376400
if (!this.#isHydrating) {
377401
if (this.#overwriteForward && !this.#forwardHistoryTruncated && tx._changeLog.length) {
378402
await this.#wal.truncateForward(this.#openedCommitTime);
379403
this.#forwardHistoryTruncated = true;
380404
}
381-
const timestamp = Date.now();
405+
timestamp = Date.now();
382406
await this.#federateCommit(tx, timestamp);
407+
}
408+
409+
const returnValue = await super.commit(tx);
410+
411+
if (!this.#isHydrating) {
383412
await this.#persistCommit(tx, timestamp);
384413
// Keep the original versionStop anchor intact until first mutating commit
385414
// performs forward-history truncation.

0 commit comments

Comments
 (0)