Skip to content

Commit 4d4f011

Browse files
committed
fix: don't fail without status != 0
1 parent 13e063e commit 4d4f011

4 files changed

Lines changed: 65 additions & 59 deletions

File tree

src/sync/dependency-tree.ts

Lines changed: 45 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
import { log } from "../log.ts";
2+
import { tracer, withSpan } from "../otel.ts";
23

34
export type Dependency =
45
// a source table with dependencies
56
| {
6-
sourceTable: string;
7-
sourceColumn: string[];
8-
referencedTable: string;
9-
referencedColumn: string[];
10-
}
7+
sourceTable: string;
8+
sourceColumn: string[];
9+
referencedTable: string;
10+
referencedColumn: string[];
11+
}
1112
// a source table with no dependencies
1213
| {
13-
sourceTable: string;
14-
sourceColumn: null;
15-
referencedTable: null;
16-
referencedColumn: null;
17-
};
14+
sourceTable: string;
15+
sourceColumn: null;
16+
referencedTable: null;
17+
referencedColumn: null;
18+
};
1819

1920
/** A pointer from a current table context to another */
2021
interface Pointer {
@@ -46,34 +47,34 @@ export type DependencyResolutionNotice =
4647
| { kind: "too_few_rows"; table: string; requested: number; found: number };
4748

4849
export type TableRows<
49-
T extends Record<string, unknown> = Record<string, unknown>,
50+
T extends Record<string, unknown> = Record<string, unknown>
5051
> = Record<TableName, T[]>;
5152

5253
export type FindAllDependenciesError =
5354
| {
54-
kind: "error";
55-
type: "unexpected_error";
56-
error: Error;
57-
}
55+
kind: "error";
56+
type: "unexpected_error";
57+
error: Error;
58+
}
5859
| {
59-
kind: "error";
60-
type: "max_table_iterations_reached";
61-
};
60+
kind: "error";
61+
type: "max_table_iterations_reached";
62+
};
6263

6364
export type FindAllDependenciesResult<
64-
T extends Record<string, unknown> = Record<string, unknown>,
65+
T extends Record<string, unknown> = Record<string, unknown>
6566
> =
6667
| {
67-
kind: "ok";
68-
items: TableRows<T>;
69-
notices: DependencyResolutionNotice[];
70-
}
68+
kind: "ok";
69+
items: TableRows<T>;
70+
notices: DependencyResolutionNotice[];
71+
}
7172
| FindAllDependenciesError;
7273

7374
export type CursorOptions = { requiredRows: number; seed: number };
7475

7576
export interface DatabaseConnector<
76-
T extends InsertableTuple = InsertableTuple,
77+
T extends InsertableTuple = InsertableTuple
7778
> {
7879
cursor(table: string, options: CursorOptions): Cursor<T>;
7980
dependencies(schema: string): Promise<Dependency[]>;
@@ -120,11 +121,11 @@ export class DependencyAnalyzer<T extends InsertableTuple = InsertableTuple> {
120121
private static readonly MAX_TABLE_ITERATIONS = 100;
121122
constructor(
122123
private readonly connector: DatabaseConnector<T>,
123-
private readonly options: DependencyAnalyzerOptions,
124+
private readonly options: DependencyAnalyzerOptions
124125
) {}
125126

126127
async findAllDependencies(
127-
schema: string,
128+
schema: string
128129
): Promise<FindAllDependenciesResult<T["data"]>> {
129130
log.debug("Starting dependency resolution", "dependency-resolution");
130131
this.seen.clear();
@@ -148,10 +149,8 @@ export class DependencyAnalyzer<T extends InsertableTuple = InsertableTuple> {
148149
const removeTable = (i: number) => {
149150
if (remainingTables[i] === undefined) {
150151
log.error(
151-
`Attempted to remove table ${
152-
remainingTables[i]
153-
} but it was not found`,
154-
"dependency-resolution",
152+
`Attempted to remove table ${remainingTables[i]} but it was not found`,
153+
"dependency-resolution"
155154
);
156155
return;
157156
}
@@ -180,7 +179,7 @@ export class DependencyAnalyzer<T extends InsertableTuple = InsertableTuple> {
180179
if (tableItems === undefined) {
181180
log.error(
182181
`Attempted to access table ${table} but it was not found in the items map`,
183-
"dependency-resolution",
182+
"dependency-resolution"
184183
);
185184
continue;
186185
}
@@ -201,7 +200,7 @@ export class DependencyAnalyzer<T extends InsertableTuple = InsertableTuple> {
201200
if (iterator.done) {
202201
log.debug(
203202
`Notice: Table \`${table}\` has fewer rows (${tableItems.length}) than the required (${this.options.requiredRows})`,
204-
"dependency-resolution",
203+
"dependency-resolution"
205204
);
206205
notices.push({
207206
kind: "too_few_rows",
@@ -218,14 +217,14 @@ export class DependencyAnalyzer<T extends InsertableTuple = InsertableTuple> {
218217
const results = await this.traverseDependencyChain(
219218
graph,
220219
table,
221-
sourceValue,
220+
sourceValue
222221
);
223222
for (const result of results) {
224223
const tableItems = items[result.table];
225224
if (tableItems === undefined) {
226225
log.error(
227226
`Attempted to access table ${result.table} but it was not found in the items map`,
228-
"dependency-resolution",
227+
"dependency-resolution"
229228
);
230229
continue;
231230
}
@@ -238,7 +237,7 @@ export class DependencyAnalyzer<T extends InsertableTuple = InsertableTuple> {
238237
if (chainExceedsMaxRows) {
239238
log.warn(
240239
`Notice: Table \`${table}\` has more rows than the maximum ${this.options.maxRows}`,
241-
"dependency-resolution",
240+
"dependency-resolution"
242241
);
243242
notices.push({
244243
kind: "incomplete_dependency_chain",
@@ -255,7 +254,7 @@ export class DependencyAnalyzer<T extends InsertableTuple = InsertableTuple> {
255254
if (tableItems === undefined) {
256255
log.error(
257256
`Attempted to access table ${table} but it was not found in the items map`,
258-
"dependency-resolution",
257+
"dependency-resolution"
259258
);
260259
continue;
261260
}
@@ -265,12 +264,10 @@ export class DependencyAnalyzer<T extends InsertableTuple = InsertableTuple> {
265264
}
266265
}
267266
log.info(
268-
`Found ${Object.keys(items).length} tables with ${
269-
Object.values(
270-
items,
271-
).reduce((acc, table) => acc + table.length, 0)
272-
} rows`,
273-
"dependency-resolution",
267+
`Found ${Object.keys(items).length} tables with ${Object.values(
268+
items
269+
).reduce((acc, table) => acc + table.length, 0)} rows`,
270+
"dependency-resolution"
274271
);
275272
return { kind: "ok", items, notices };
276273
} catch (error) {
@@ -292,16 +289,16 @@ export class DependencyAnalyzer<T extends InsertableTuple = InsertableTuple> {
292289
async traverseDependencyChain(
293290
graph: DependencyGraph,
294291
source: string,
295-
value: T,
292+
value: T
296293
): Promise<T[]> {
297294
const table = graph.get(source);
298295
if (!table) {
299296
throw new Error(
300-
`Table not declared in dependency graph. The graph should include a key for all existing tables in a database, even if they have no dependencies`,
297+
`Table not declared in dependency graph. The graph should include a key for all existing tables in a database, even if they have no dependencies`
301298
);
302299
}
303300
function valuesFor(
304-
pointer: Pointer,
301+
pointer: Pointer
305302
):
306303
| { ok: false; column: string[] }
307304
| { ok: true; params: Record<string, unknown> } {
@@ -331,11 +328,11 @@ export class DependencyAnalyzer<T extends InsertableTuple = InsertableTuple> {
331328
}
332329
const referenced = await this.connector.get(
333330
dep.referencedTable,
334-
values.params,
331+
values.params
335332
);
336333
if (!referenced) {
337334
throw new Error(
338-
`Found an existing FK requirement but there was no corresponding row in ${dep.referencedTable}. Is the database in a consistent state?`,
335+
`Found an existing FK requirement but there was no corresponding row in ${dep.referencedTable}. Is the database in a consistent state?`
339336
);
340337
}
341338
const hashed = this.connector.hash(referenced);
@@ -347,7 +344,7 @@ export class DependencyAnalyzer<T extends InsertableTuple = InsertableTuple> {
347344
const next = await this.traverseDependencyChain(
348345
graph,
349346
dep.referencedTable,
350-
referenced,
347+
referenced
351348
);
352349
results.push(...next);
353350
}

src/sync/pg-connector.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import type {
88
TableRows,
99
} from "./dependency-tree.ts";
1010
import { log } from "../log.ts";
11+
import { trace } from "@opentelemetry/api";
1112

1213
const ctidSymbol = Symbol("ctid");
1314
type Row = NonNullable<postgres.Row & Iterable<postgres.Row>> & {

src/sync/schema.ts

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,20 @@ export class PostgresSchemaLink {
4747
return shippedPath;
4848
}
4949

50-
async syncSchema(): Promise<string> {
50+
async syncSchema(schemaName: string): Promise<string> {
5151
log.debug("Syncing schema", "schema:sync");
5252
const decoder = new TextDecoder();
5353
// const [dumping, omits] = await this.omitBigBois();
5454
const args = [
5555
// the owner doesn't exist
5656
"--no-owner",
57+
// not needed most likely
58+
"--no-comments",
5759
// privileges don't exist
5860
"--no-privileges",
61+
// providers like supabase have a ton of stuff we don't need in other schemas
62+
"--schema",
63+
schemaName,
5964
"--schema-only",
6065
this.url,
6166
];
@@ -70,22 +75,24 @@ export class PostgresSchemaLink {
7075
stdout: "piped",
7176
stderr: "piped",
7277
});
78+
const start = Date.now();
7379
const output = await command.output();
74-
span?.addEvent("sync:end");
80+
const end = Date.now();
81+
span?.addEvent("sync:end", end, start);
7582
span?.setAttribute("outputBytes", output.stdout.byteLength);
76-
if (output.stderr.byteLength > 0) {
83+
if (output.code !== 0) {
7784
const stderr = decoder.decode(output.stderr);
7885
span?.setStatus({ code: SpanStatusCode.ERROR, message: stderr });
7986
log.error(`Error: ${stderr}`, "schema:sync");
8087
throw new Error(stderr);
81-
} else {
82-
log.info(
83-
`Dumped schema. bytes=${output.stdout.byteLength}`,
84-
"schema:sync"
85-
);
86-
const stdout = decoder.decode(output.stdout);
87-
return stdout;
8888
}
89+
log.info(`Dumped schema. bytes=${output.stdout.byteLength}`, "schema:sync");
90+
const stdout = decoder.decode(output.stdout);
91+
return this.sanitizeSchema(stdout);
92+
}
93+
94+
private sanitizeSchema(schema: string): string {
95+
return schema.replace(/^CREATE SCHEMA\s./, "");
8996
}
9097

9198
// async omitBigBois(): Promise<[TableCount[], TableCount[]]> {

src/sync/syncer.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,12 @@ export class PostgresSyncer {
128128
return connector.getRecentQueries();
129129
})(),
130130
withSpan("syncSchema", () => {
131-
return link.syncSchema();
131+
return link.syncSchema(schemaName);
132132
})(),
133133
withSpan("resolveDependencies", async (span) => {
134134
span.setAttribute("schemaName", schemaName);
135135
const deps = await analyzer.findAllDependencies(schemaName);
136+
console.log(deps);
136137
if (deps.kind !== "ok") {
137138
span.setStatus({
138139
code: SpanStatusCode.ERROR,

0 commit comments

Comments
 (0)