diff --git a/PowerSync/PowerSync.Common/Attachments/AttachmentQueue.cs b/PowerSync/PowerSync.Common/Attachments/AttachmentQueue.cs
index fa55bc8..081ee37 100644
--- a/PowerSync/PowerSync.Common/Attachments/AttachmentQueue.cs
+++ b/PowerSync/PowerSync.Common/Attachments/AttachmentQueue.cs
@@ -122,8 +122,9 @@ public async Task StartSyncAsync()
/// called manually to await an immediate sync pass (e.g. before shutting down). For
/// "fire and forget", use .
///
+ /// The cancellation token observed between records.
/// A task that completes when the sync pass has finished.
- public Task SyncStorageAsync() => _syncingService.RunSyncPassAsync();
+ public Task SyncStorageAsync(CancellationToken ct = default) => _syncingService.RunSyncPassAsync(ct);
///
/// Stops the attachment synchronization process. Cancels the sync pipeline, stops the periodic
diff --git a/PowerSync/PowerSync.Common/Attachments/SyncingService.cs b/PowerSync/PowerSync.Common/Attachments/SyncingService.cs
index 12dcaea..8e93865 100644
--- a/PowerSync/PowerSync.Common/Attachments/SyncingService.cs
+++ b/PowerSync/PowerSync.Common/Attachments/SyncingService.cs
@@ -22,6 +22,7 @@ internal sealed class SyncingService(
ILogger logger)
{
private readonly SemaphoreSlim _startStopLock = new(1, 1);
+ private readonly SemaphoreSlim _syncPassLock = new(1, 1);
private CancellationTokenSource? _internalCts;
private Channel? _syncSignals;
@@ -112,41 +113,57 @@ private async Task StopSyncInternalAsync()
///
/// Runs one sync pass: fetches active attachments, processes them, then prunes archived rows.
///
+ /// The cancellation token observed between records.
/// A task that completes when the pass has finished.
- public Task RunSyncPassAsync() => attachmentService.WithContextAsync(async ctx =>
+ public async Task RunSyncPassAsync(CancellationToken ct = default)
{
- var active = await ctx.GetActiveAttachmentsAsync();
- await ProcessAttachmentsAsync(active, ctx);
- await DeleteArchivedAttachmentsAsync(ctx);
- });
+ await _syncPassLock.WaitAsync(ct);
+ try
+ {
+ var active = await attachmentService.WithContextAsync(ctx => ctx.GetActiveAttachmentsAsync());
+ await ProcessAttachmentsAsync(active, ct);
+ await attachmentService.WithContextAsync(ctx => DeleteArchivedAttachmentsAsync(ctx));
+ }
+ finally
+ {
+ _syncPassLock.Release();
+ }
+ }
///
- /// Processes attachments based on their state. Updates are saved in a single batch.
+ /// Processes attachments based on their state. Each state change is persisted as soon as its
+ /// transfer completes.
///
/// Attachment records to process.
- /// Attachment context for database operations.
- /// A task that completes once all attachments have been processed and saved.
- public async Task ProcessAttachmentsAsync(IReadOnlyList attachments, AttachmentContext context)
+ /// The cancellation token observed between records.
+ /// A task that completes once all attachments have been processed.
+ public async Task ProcessAttachmentsAsync(IReadOnlyList attachments, CancellationToken ct = default)
{
- var updates = new List();
-
foreach (var attachment in attachments)
{
- Attachment? changed = attachment.State switch
+ ct.ThrowIfCancellationRequested();
+
+ var snapshotState = attachment.State;
+ Attachment? changed = snapshotState switch
{
AttachmentState.QueuedUpload => await UploadAttachmentAsync(attachment),
AttachmentState.QueuedDownload => await DownloadAttachmentAsync(attachment),
- AttachmentState.QueuedDelete => await DeleteAttachmentAsync(attachment, context),
+ AttachmentState.QueuedDelete => await DeleteAttachmentAsync(attachment),
_ => null,
};
if (changed is not null)
{
- updates.Add(changed);
+ await attachmentService.WithContextAsync(async ctx =>
+ {
+ var current = await ctx.GetAttachmentAsync(attachment.Id);
+ if (current?.State == snapshotState)
+ {
+ await ctx.SaveAttachmentsAsync([changed]);
+ }
+ });
}
}
-
- await context.SaveAttachmentsAsync(updates);
}
///
@@ -231,9 +248,8 @@ public async Task ProcessAttachmentsAsync(IReadOnlyList attachments,
/// On failure, defers to or archives.
///
/// The attachment to delete.
- /// The attachment context for database operations.
/// The archived attachment, or null on success or retry.
- public async Task DeleteAttachmentAsync(Attachment attachment, AttachmentContext context)
+ public async Task DeleteAttachmentAsync(Attachment attachment)
{
try
{
@@ -243,7 +259,17 @@ public async Task ProcessAttachmentsAsync(IReadOnlyList attachments,
await localStorage.DeleteFileAsync(attachment.LocalUri);
}
- await context.DeleteAttachmentAsync(attachment.Id);
+ await attachmentService.WithContextAsync(async ctx =>
+ {
+ var current = await ctx.GetAttachmentAsync(attachment.Id);
+ if (current?.State != AttachmentState.QueuedDelete)
+ {
+ return;
+ }
+
+ await ctx.DeleteAttachmentAsync(attachment.Id);
+ });
+
return null;
}
catch (Exception error)
@@ -326,7 +352,7 @@ private async Task SyncSignalConsumerAsync(CancellationToken ct)
try
{
- await RunSyncPassAsync();
+ await RunSyncPassAsync(ct);
}
catch (OperationCanceledException)
{