Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
393 changes: 351 additions & 42 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,49 @@ import { ServiceAssertionError } from '@powersync/lib-services-framework';
import {
BucketDataSource,
BucketDefinitionId,
HashMap,
ParameterIndexId,
ParameterIndexLookupCreator,
ParameterLookupDefinitionId,
SerializedBucketDataSourceWithDataSources,
SerializedParameterIndexLookupCreator,
serializedStreamBucketDataSourceEquality,
serializedStreamParameterIndexLookupCreatorEquality,
SerializedSyncPlanV1,
SyncConfigWithErrors
} from '@powersync/service-sync-rules';
import { SyncConfigDefinition } from '../storage-index.js';

export interface SerializedSyncConfigWithMapping {
plan: SerializedSyncPlanV1;
mapping: BucketDefinitionMapping;
}

export interface SyncConfigWithMapping {
syncConfig: SyncConfigWithErrors;
mapping: BucketDefinitionMapping | null;
}

export interface SyncConfigWithRequiredMapping {
syncConfigId?: string;
syncConfig: SyncConfigWithErrors;
mapping: BucketDefinitionMapping;
}

/**
* Represents a mapping from bucket data sources and parameter lookup sources to stable IDs used for bucket definition and parameter index persistence.
*
* An instance of BucketDefinitionMapping is associated with a specific SyncConfig.
* MongoHydrationState handles the mapping across multiple SyncConfigs in the same replication stream.
*/
export class BucketDefinitionMapping {
static fromSyncConfig(doc: Pick<SyncConfigDefinition, 'rule_mapping'>): BucketDefinitionMapping {
return new BucketDefinitionMapping(doc.rule_mapping?.definitions ?? {}, doc.rule_mapping?.parameter_indexes ?? {});
}

static fromParsedSyncRules(syncRules: SyncConfigWithErrors): BucketDefinitionMapping {
const definitionNames = syncRules.config.bucketDataSources.map((source) => source.uniqueName).sort();
const parameterKeys = syncRules.config.bucketParameterLookupSources
static fromParsedSyncConfig(syncConfig: SyncConfigWithErrors): BucketDefinitionMapping {
const definitionNames = syncConfig.config.bucketDataSources.map((source) => source.uniqueName).sort();
const parameterKeys = syncConfig.config.bucketParameterLookupSources
.map((source) => `${source.sourceId.lookupName}#${source.sourceId.queryId}`)
.sort();

Expand All @@ -32,15 +61,88 @@ export class BucketDefinitionMapping {
return new BucketDefinitionMapping(definitions, parameterLookups);
}

static constructIncrementalMappingFromSerializedPlans(
existing: SerializedSyncConfigWithMapping[],
newPlan: SerializedSyncPlanV1
): BucketDefinitionMapping {
// FIXME: These ids may conflict with existing mappings if sync configs are de-activated.
let nextBucketDefinitionId =
existing
.map((c) => c.mapping.allBucketDefinitionIds())
.flat()
.reduce((maxId, id) => Math.max(maxId, parseInt(id, 16)), 0) + 1;
function generateNewBucketDefinitionId(): BucketDefinitionId {
const id = nextBucketDefinitionId.toString(16);
nextBucketDefinitionId++;
return id;
}
let nextParameterIndexId =
existing
.map((c) => c.mapping.allParameterIndexIds())
.flat()
.reduce((maxId, id) => Math.max(maxId, parseInt(id, 16)), 0) + 1;
function generateNewParameterIndexId(): ParameterIndexId {
const id = nextParameterIndexId.toString(16);
nextParameterIndexId++;
return id;
}

const definitions: Record<string, BucketDefinitionId> = {};
const parameterLookups: Record<string, ParameterIndexId> = {};
const compatibleBuckets = new HashMap<SerializedBucketDataSourceWithDataSources, BucketDefinitionId>(
serializedStreamBucketDataSourceEquality
);
const compatibleParameterLookups = new HashMap<SerializedParameterIndexLookupCreator, ParameterIndexId>(
serializedStreamParameterIndexLookupCreatorEquality
);

for (const config of existing) {
for (const bucket of config.plan.buckets) {
compatibleBuckets.putIfAbsent({ bucket, dataSources: config.plan.dataSources }, () =>
config.mapping.bucketSourceIdByName(bucket.uniqueName)
);
}

for (const parameterLookup of config.plan.parameterIndexes) {
compatibleParameterLookups.putIfAbsent(parameterLookup, () =>
config.mapping.parameterLookupIdByKey(parameterLookupKey(parameterLookup.lookupScope))
);
}
}

for (const bucket of newPlan.buckets) {
const compatibleId = compatibleBuckets.get({ bucket, dataSources: newPlan.dataSources });
const id = compatibleId ?? generateNewBucketDefinitionId();
definitions[bucket.uniqueName] = id;
}

for (const parameterLookup of newPlan.parameterIndexes) {
const compatibleId = compatibleParameterLookups.get(parameterLookup);
const id = compatibleId ?? generateNewParameterIndexId();
parameterLookups[parameterLookupKey(parameterLookup.lookupScope)] = id;
}

return new BucketDefinitionMapping(definitions, parameterLookups);
}

constructor(
private definitions: Record<string, BucketDefinitionId> = {},
private parameterLookupMapping: Record<string, ParameterIndexId> = {}
) {}

/**
* Given a BucketDataSource within this SyncConfig, return the BucketDefinitionId, or throw if not found.
*
* The behavior is undefined if the source is associated with a different SyncConfig.
*/
bucketSourceId(source: BucketDataSource): BucketDefinitionId {
const defId = this.definitions[source.uniqueName];
return this.bucketSourceIdByName(source.uniqueName);
}

bucketSourceIdByName(uniqueName: string): BucketDefinitionId {
const defId = this.definitions[uniqueName];
if (defId == null) {
throw new ServiceAssertionError(`No mapping found for bucket source ${source.uniqueName}`);
throw new ServiceAssertionError(`No mapping found for bucket source ${uniqueName}`);
}
return defId;
}
Expand All @@ -54,22 +156,108 @@ export class BucketDefinitionMapping {
}

parameterLookupId(source: ParameterIndexLookupCreator): ParameterIndexId {
const key = this.parameterLookupKey(source.sourceId.lookupName, source.sourceId.queryId);
return this.parameterLookupIdByKey(parameterLookupKey(source.sourceId));
}

parameterLookupIdByKey(key: string): ParameterIndexId {
const defId = this.parameterLookupMapping[key];
if (defId == null) {
throw new ServiceAssertionError(`No mapping found for parameter lookup source ${key}`);
}
return defId;
}

private parameterLookupKey(lookupName: string, queryId: string) {
return `${lookupName}#${queryId}`;
}

serialize(): SyncConfigDefinition['rule_mapping'] {
return {
definitions: { ...this.definitions },
parameter_indexes: { ...this.parameterLookupMapping }
};
}
}

export class MultiSyncConfigBucketDefinitionMapping extends BucketDefinitionMapping {
private bucketDataSourceMappings = new WeakMap<BucketDataSource, BucketDefinitionMapping>();
private bucketDataSourceMappingsByName = new Map<string, SyncConfigWithRequiredMapping[]>();
private parameterLookupMappings = new WeakMap<ParameterIndexLookupCreator, BucketDefinitionMapping>();
private parameterLookupMappingsByKey = new Map<string, SyncConfigWithRequiredMapping[]>();
private mappings: BucketDefinitionMapping[];

constructor(syncConfigs: SyncConfigWithRequiredMapping[]) {
super();
this.mappings = syncConfigs.map((config) => config.mapping);

for (const config of syncConfigs) {
for (const source of config.syncConfig.config.bucketDataSources) {
this.bucketDataSourceMappings.set(source, config.mapping);
addMappingEntry(this.bucketDataSourceMappingsByName, source.uniqueName, config);
}
for (const source of config.syncConfig.config.bucketParameterLookupSources) {
this.parameterLookupMappings.set(source, config.mapping);
addMappingEntry(this.parameterLookupMappingsByKey, parameterLookupKey(source.sourceId), config);
}
}
}

bucketSourceId(source: BucketDataSource): BucketDefinitionId {
const mapping = this.bucketDataSourceMappings.get(source);
if (mapping != null) {
return mapping.bucketSourceId(source);
}

const id = this.unambiguousBucketSourceIdByName(source.uniqueName);
if (id == null) {
throw new ServiceAssertionError(`No mapping found for bucket source ${source.uniqueName}`);
}
return id;
}

allBucketDefinitionIds(): BucketDefinitionId[] {
return [...new Set(this.mappings.flatMap((mapping) => mapping.allBucketDefinitionIds()))];
}

parameterLookupId(source: ParameterIndexLookupCreator): ParameterIndexId {
const mapping = this.parameterLookupMappings.get(source);
if (mapping != null) {
return mapping.parameterLookupId(source);
}

const key = parameterLookupKey(source.sourceId);
const id = this.unambiguousParameterLookupIdByKey(key);
if (id == null) {
throw new ServiceAssertionError(
`No mapping found for parameter lookup source ${source.sourceId.lookupName}#${source.sourceId.queryId}`
);
}
return id;
}

allParameterIndexIds(): ParameterIndexId[] {
return [...new Set(this.mappings.flatMap((mapping) => mapping.allParameterIndexIds()))];
}

private unambiguousBucketSourceIdByName(uniqueName: string): BucketDefinitionId | null {
const entries = this.bucketDataSourceMappingsByName.get(uniqueName) ?? [];
const ids = new Set(entries.map((entry) => entry.mapping.bucketSourceIdByName(uniqueName)));
return ids.size == 1 ? [...ids][0] : null;
}

private unambiguousParameterLookupIdByKey(key: string): ParameterIndexId | null {
const entries = this.parameterLookupMappingsByKey.get(key) ?? [];
const ids = new Set(entries.map((entry) => entry.mapping.parameterLookupIdByKey(key)));
return ids.size == 1 ? [...ids][0] : null;
}
}

export function parameterLookupKey(id: ParameterLookupDefinitionId) {
return `${id.lookupName}#${id.queryId}`;
}

function addMappingEntry(
map: Map<string, SyncConfigWithRequiredMapping[]>,
key: string,
config: SyncConfigWithRequiredMapping
) {
const existing = map.get(key) ?? [];
existing.push(config);
map.set(key, existing);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export interface MongoBucketBatchOptions {
syncRules: HydratedSyncConfig;
groupId: number;
slotName: string;
syncConfigId?: bson.ObjectId | null;
syncConfigIds?: bson.ObjectId[];
lastCheckpointLsn: string | null;
keepaliveOp: InternalOpId | null;
resumeFromLsn: string | null;
Expand Down
Loading
Loading