diff --git a/PowerSync/PowerSync.Common/Attachments/AttachmentQueue.cs b/PowerSync/PowerSync.Common/Attachments/AttachmentQueue.cs index fa55bc8..081ee37 100644 --- a/PowerSync/PowerSync.Common/Attachments/AttachmentQueue.cs +++ b/PowerSync/PowerSync.Common/Attachments/AttachmentQueue.cs @@ -122,8 +122,9 @@ public async Task StartSyncAsync() /// called manually to await an immediate sync pass (e.g. before shutting down). For /// "fire and forget", use . /// + /// The cancellation token observed between records. /// A task that completes when the sync pass has finished. - public Task SyncStorageAsync() => _syncingService.RunSyncPassAsync(); + public Task SyncStorageAsync(CancellationToken ct = default) => _syncingService.RunSyncPassAsync(ct); /// /// Stops the attachment synchronization process. Cancels the sync pipeline, stops the periodic diff --git a/PowerSync/PowerSync.Common/Attachments/SyncingService.cs b/PowerSync/PowerSync.Common/Attachments/SyncingService.cs index 12dcaea..8e93865 100644 --- a/PowerSync/PowerSync.Common/Attachments/SyncingService.cs +++ b/PowerSync/PowerSync.Common/Attachments/SyncingService.cs @@ -22,6 +22,7 @@ internal sealed class SyncingService( ILogger logger) { private readonly SemaphoreSlim _startStopLock = new(1, 1); + private readonly SemaphoreSlim _syncPassLock = new(1, 1); private CancellationTokenSource? _internalCts; private Channel? _syncSignals; @@ -112,41 +113,57 @@ private async Task StopSyncInternalAsync() /// /// Runs one sync pass: fetches active attachments, processes them, then prunes archived rows. /// + /// The cancellation token observed between records. /// A task that completes when the pass has finished. - public Task RunSyncPassAsync() => attachmentService.WithContextAsync(async ctx => + public async Task RunSyncPassAsync(CancellationToken ct = default) { - var active = await ctx.GetActiveAttachmentsAsync(); - await ProcessAttachmentsAsync(active, ctx); - await DeleteArchivedAttachmentsAsync(ctx); - }); + await _syncPassLock.WaitAsync(ct); + try + { + var active = await attachmentService.WithContextAsync(ctx => ctx.GetActiveAttachmentsAsync()); + await ProcessAttachmentsAsync(active, ct); + await attachmentService.WithContextAsync(ctx => DeleteArchivedAttachmentsAsync(ctx)); + } + finally + { + _syncPassLock.Release(); + } + } /// - /// Processes attachments based on their state. Updates are saved in a single batch. + /// Processes attachments based on their state. Each state change is persisted as soon as its + /// transfer completes. /// /// Attachment records to process. - /// Attachment context for database operations. - /// A task that completes once all attachments have been processed and saved. - public async Task ProcessAttachmentsAsync(IReadOnlyList attachments, AttachmentContext context) + /// The cancellation token observed between records. + /// A task that completes once all attachments have been processed. + public async Task ProcessAttachmentsAsync(IReadOnlyList attachments, CancellationToken ct = default) { - var updates = new List(); - foreach (var attachment in attachments) { - Attachment? changed = attachment.State switch + ct.ThrowIfCancellationRequested(); + + var snapshotState = attachment.State; + Attachment? changed = snapshotState switch { AttachmentState.QueuedUpload => await UploadAttachmentAsync(attachment), AttachmentState.QueuedDownload => await DownloadAttachmentAsync(attachment), - AttachmentState.QueuedDelete => await DeleteAttachmentAsync(attachment, context), + AttachmentState.QueuedDelete => await DeleteAttachmentAsync(attachment), _ => null, }; if (changed is not null) { - updates.Add(changed); + await attachmentService.WithContextAsync(async ctx => + { + var current = await ctx.GetAttachmentAsync(attachment.Id); + if (current?.State == snapshotState) + { + await ctx.SaveAttachmentsAsync([changed]); + } + }); } } - - await context.SaveAttachmentsAsync(updates); } /// @@ -231,9 +248,8 @@ public async Task ProcessAttachmentsAsync(IReadOnlyList attachments, /// On failure, defers to or archives. /// /// The attachment to delete. - /// The attachment context for database operations. /// The archived attachment, or null on success or retry. - public async Task DeleteAttachmentAsync(Attachment attachment, AttachmentContext context) + public async Task DeleteAttachmentAsync(Attachment attachment) { try { @@ -243,7 +259,17 @@ public async Task ProcessAttachmentsAsync(IReadOnlyList attachments, await localStorage.DeleteFileAsync(attachment.LocalUri); } - await context.DeleteAttachmentAsync(attachment.Id); + await attachmentService.WithContextAsync(async ctx => + { + var current = await ctx.GetAttachmentAsync(attachment.Id); + if (current?.State != AttachmentState.QueuedDelete) + { + return; + } + + await ctx.DeleteAttachmentAsync(attachment.Id); + }); + return null; } catch (Exception error) @@ -326,7 +352,7 @@ private async Task SyncSignalConsumerAsync(CancellationToken ct) try { - await RunSyncPassAsync(); + await RunSyncPassAsync(ct); } catch (OperationCanceledException) {