Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion PowerSync/PowerSync.Common/Attachments/AttachmentQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ public async Task StartSyncAsync()
/// called manually to await an immediate sync pass (e.g. before shutting down). For
/// "fire and forget", use <see cref="TriggerSync"/>.
/// </remarks>
/// <param name="ct">The cancellation token observed between records.</param>
/// <returns>A task that completes when the sync pass has finished.</returns>
public Task SyncStorageAsync() => _syncingService.RunSyncPassAsync();
public Task SyncStorageAsync(CancellationToken ct = default) => _syncingService.RunSyncPassAsync(ct);

/// <summary>
/// Stops the attachment synchronization process. Cancels the sync pipeline, stops the periodic
Expand Down
66 changes: 46 additions & 20 deletions PowerSync/PowerSync.Common/Attachments/SyncingService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal sealed class SyncingService(
ILogger logger)
{
private readonly SemaphoreSlim _startStopLock = new(1, 1);
private readonly SemaphoreSlim _syncPassLock = new(1, 1);
private CancellationTokenSource? _internalCts;
private Channel<bool>? _syncSignals;

Expand Down Expand Up @@ -112,41 +113,57 @@ private async Task StopSyncInternalAsync()
/// <summary>
/// Runs one sync pass: fetches active attachments, processes them, then prunes archived rows.
/// </summary>
/// <param name="ct">The cancellation token observed between records.</param>
/// <returns>A task that completes when the pass has finished.</returns>
public Task RunSyncPassAsync() => attachmentService.WithContextAsync(async ctx =>
public async Task RunSyncPassAsync(CancellationToken ct = default)
{
var active = await ctx.GetActiveAttachmentsAsync();
await ProcessAttachmentsAsync(active, ctx);
await DeleteArchivedAttachmentsAsync(ctx);
});
await _syncPassLock.WaitAsync(ct);
try
{
var active = await attachmentService.WithContextAsync(ctx => ctx.GetActiveAttachmentsAsync());
await ProcessAttachmentsAsync(active, ct);
await attachmentService.WithContextAsync(ctx => DeleteArchivedAttachmentsAsync(ctx));
}
finally
{
_syncPassLock.Release();
}
}

/// <summary>
/// Processes attachments based on their state. Updates are saved in a single batch.
/// Processes attachments based on their state. Each state change is persisted as soon as its
/// transfer completes.
/// </summary>
/// <param name="attachments">Attachment records to process.</param>
/// <param name="context">Attachment context for database operations.</param>
/// <returns>A task that completes once all attachments have been processed and saved.</returns>
public async Task ProcessAttachmentsAsync(IReadOnlyList<Attachment> attachments, AttachmentContext context)
/// <param name="ct">The cancellation token observed between records.</param>
/// <returns>A task that completes once all attachments have been processed.</returns>
public async Task ProcessAttachmentsAsync(IReadOnlyList<Attachment> attachments, CancellationToken ct = default)
{
var updates = new List<Attachment>();

foreach (var attachment in attachments)
{
Attachment? changed = attachment.State switch
ct.ThrowIfCancellationRequested();

var snapshotState = attachment.State;
Attachment? changed = snapshotState switch
{
AttachmentState.QueuedUpload => await UploadAttachmentAsync(attachment),
AttachmentState.QueuedDownload => await DownloadAttachmentAsync(attachment),
AttachmentState.QueuedDelete => await DeleteAttachmentAsync(attachment, context),
AttachmentState.QueuedDelete => await DeleteAttachmentAsync(attachment),
_ => null,
};

if (changed is not null)
{
updates.Add(changed);
await attachmentService.WithContextAsync(async ctx =>
{
var current = await ctx.GetAttachmentAsync(attachment.Id);
if (current?.State == snapshotState)
{
await ctx.SaveAttachmentsAsync([changed]);
}
});
}
}

await context.SaveAttachmentsAsync(updates);
}

/// <summary>
Expand Down Expand Up @@ -231,9 +248,8 @@ public async Task ProcessAttachmentsAsync(IReadOnlyList<Attachment> attachments,
/// On failure, defers to <see cref="IAttachmentErrorHandler"/> or archives.
/// </summary>
/// <param name="attachment">The attachment to delete.</param>
/// <param name="context">The attachment context for database operations.</param>
/// <returns>The archived attachment, or <c>null</c> on success or retry.</returns>
public async Task<Attachment?> DeleteAttachmentAsync(Attachment attachment, AttachmentContext context)
public async Task<Attachment?> DeleteAttachmentAsync(Attachment attachment)
{
try
{
Expand All @@ -243,7 +259,17 @@ public async Task ProcessAttachmentsAsync(IReadOnlyList<Attachment> attachments,
await localStorage.DeleteFileAsync(attachment.LocalUri);
}

await context.DeleteAttachmentAsync(attachment.Id);
await attachmentService.WithContextAsync(async ctx =>
{
var current = await ctx.GetAttachmentAsync(attachment.Id);
if (current?.State != AttachmentState.QueuedDelete)
{
return;
}

await ctx.DeleteAttachmentAsync(attachment.Id);
});

return null;
}
catch (Exception error)
Expand Down Expand Up @@ -326,7 +352,7 @@ private async Task SyncSignalConsumerAsync(CancellationToken ct)

try
{
await RunSyncPassAsync();
await RunSyncPassAsync(ct);
}
catch (OperationCanceledException)
{
Expand Down
Loading