From 023049a645ec4812f795c57467e84239a6538802 Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Tue, 10 Mar 2026 00:46:00 +0530 Subject: [PATCH 1/3] RATIS-2430. Write snapshot to temporary path until finish --- .../apache/ratis/server/impl/ServerState.java | 13 +++++++++---- .../impl/SnapshotInstallationHandler.java | 9 +++++---- .../ratis/server/storage/SnapshotManager.java | 19 +++++++++++++++---- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index bcf11baf7a..7fb3f47afd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -470,11 +470,16 @@ RaftStorageImpl getStorage() { return raftStorage.getUnchecked(); } - void installSnapshot(InstallSnapshotRequestProto request) throws IOException { + void appendSnapshot(InstallSnapshotRequestProto request) throws IOException { // TODO: verify that we need to install the snapshot - StateMachine sm = server.getStateMachine(); - sm.pause(); // pause the SM to prepare for install snapshot - snapshotManager.installSnapshot(request, sm); + snapshotManager.appendSnapshot(request, server.getStateMachine()); + } + + void finalizeSnapshot(InstallSnapshotRequestProto request) throws IOException { + final StateMachine sm = server.getStateMachine(); + sm.pause(); // pause the SM right before publishing the snapshot atomically + // TODO: if there is a failure here, we need to rollback the snapshot installation. + snapshotManager.finalizeSnapshot(request); } private SnapshotInfo getLatestSnapshot() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 46b6aaf87f..57fce8adf8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -214,17 +214,18 @@ private CompletableFuture checkAndInstallSnapshot(Ins return future.thenApply(dummy -> reply); } - //TODO: We should only update State with installed snapshot once the request is done. - state.installSnapshot(request); - - final int expectedChunkIndex = nextChunkIndex.getAndIncrement(); + final int expectedChunkIndex = nextChunkIndex.get(); if (expectedChunkIndex != snapshotChunkRequest.getRequestIndex()) { throw new IOException("Unexpected request chunk index: " + snapshotChunkRequest.getRequestIndex() + " (the expected index is " + expectedChunkIndex + ")"); } + // Append chunks to a temporary location first. Publish only when done=true. + state.appendSnapshot(request); + nextChunkIndex.incrementAndGet(); // update the committed index // re-load the state machine if this is the last chunk if (snapshotChunkRequest.getDone()) { + state.finalizeSnapshot(request); state.reloadStateMachine(lastIncluded); chunk0CallId.set(-1); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java index a96001b598..d2d44e3d40 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java @@ -99,12 +99,16 @@ private FileChannel open(FileChunkProto chunk, File tmpSnapshotFile) throws IOEx return out; } - public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine) throws IOException { + private File getSnapshotTmpDir(String requestId) { + return new File(this.snapshotTmpDir.get(), "snapshot-" + requestId); + } + + public void appendSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine) throws IOException { final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex(); // create a unique temporary directory - final File tmpDir = new File(this.snapshotTmpDir.get(), "snapshot-" + snapshotChunkRequest.getRequestId()); + final File tmpDir = getSnapshotTmpDir(snapshotChunkRequest.getRequestId()); FileUtils.createDirectories(tmpDir); tmpDir.deleteOnExit(); @@ -164,9 +168,16 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st } } - if (snapshotChunkRequest.getDone()) { - rename(tmpDir, snapshotDir.get()); + } + + public void finalizeSnapshot(InstallSnapshotRequestProto request) throws IOException { + final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); + if (!snapshotChunkRequest.getDone()) { + throw new IOException("Cannot finalize incomplete snapshot request: " + + ServerStringUtils.toInstallSnapshotRequestString(request)); } + final File tmpDir = getSnapshotTmpDir(snapshotChunkRequest.getRequestId()); + rename(tmpDir, snapshotDir.get()); } private static void rename(File tmpDir, File stateMachineDir) throws IOException { From b79a9cda01e5f3d00c5ef39864c08e230fb16a9f Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Tue, 10 Mar 2026 10:44:44 +0530 Subject: [PATCH 2/3] Add new test class for testing partial/failing transfer --- .../server/storage/SnapshotManagerTest.java | 174 ++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 ratis-server/src/test/java/org/apache/ratis/server/storage/SnapshotManagerTest.java diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/SnapshotManagerTest.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/SnapshotManagerTest.java new file mode 100644 index 0000000000..593642698c --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/SnapshotManagerTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.proto.RaftProtos.FileChunkProto; +import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.statemachine.SnapshotRetentionPolicy; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.StateMachineStorage; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.FileUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.security.MessageDigest; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SnapshotManagerTest { + private static final class TestRaftStorageDirectory implements RaftStorageDirectory { + private final File root; + + private TestRaftStorageDirectory(File root) { + this.root = root; + } + + @Override + public File getRoot() { + return root; + } + + @Override + public boolean isHealthy() { + return true; + } + } + + private static final StateMachineStorage EMPTY_STORAGE = new StateMachineStorage() { + @Override + public void init(RaftStorage raftStorage) { + } + + @Override + public SnapshotInfo getLatestSnapshot() { + return null; + } + + @Override + public void format() { + } + + @Override + public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) { + } + }; + + private static InstallSnapshotRequestProto newSnapshotRequest( + String requestId, int requestIndex, boolean requestDone, String filename, byte[] fullData, + int offset, int chunkSize, boolean chunkDone) throws Exception { + final FileChunkProto fileChunk = FileChunkProto.newBuilder() + .setFilename(filename) + .setTotalSize(fullData.length) + .setFileDigest(ByteString.copyFrom(md5(fullData))) + .setChunkIndex(requestIndex) + .setOffset(offset) + .setData(ByteString.copyFrom(fullData, offset, chunkSize)) + .setDone(chunkDone) + .build(); + + final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunk = + InstallSnapshotRequestProto.SnapshotChunkProto.newBuilder() + .setRequestId(requestId) + .setRequestIndex(requestIndex) + .setTermIndex(TermIndex.valueOf(1L, 10L).toProto()) + .addFileChunks(fileChunk) + .setTotalSize(fullData.length) + .setDone(requestDone) + .build(); + + return InstallSnapshotRequestProto.newBuilder() + .setSnapshotChunk(snapshotChunk) + .build(); + } + + private static byte[] md5(byte[] data) throws Exception { + return MessageDigest.getInstance("MD5").digest(data); + } + + @Test + public void testAppendOnlyAndFinalizePublish() throws Exception { + final File root = Files.createTempDirectory("snapshot-manager-test").toFile(); + try { + final SnapshotManager manager = new SnapshotManager( + RaftPeerId.valueOf("s1"), () -> new TestRaftStorageDirectory(root), EMPTY_STORAGE); + final StateMachine stateMachine = mock(StateMachine.class); + when(stateMachine.getLatestSnapshot()).thenReturn(null); + + final File stateMachineDir = new File(root, RaftStorageDirectory.STATE_MACHINE_DIR_NAME); + FileUtils.createDirectories(stateMachineDir); + final File oldSnapshot = new File(stateMachineDir, "old.snapshot"); + Files.write(oldSnapshot.toPath(), "old".getBytes(StandardCharsets.UTF_8)); + + final byte[] fullData = "0123456789abcdef".getBytes(StandardCharsets.UTF_8); + final File newSnapshot = new File(stateMachineDir, "new.snapshot"); + final String filename = new File( + RaftStorageDirectory.STATE_MACHINE_DIR_NAME, "new.snapshot").toString(); + final File tmpSnapshot = new File(new File( + new File(root, RaftStorageDirectory.TMP_DIR_NAME), "snapshot-request-1"), "new.snapshot"); + final InstallSnapshotRequestProto chunk0 = newSnapshotRequest( + "request-1", 0, false, filename, fullData, 0, 8, false); + final InstallSnapshotRequestProto chunk1 = newSnapshotRequest( + "request-1", 1, true, filename, fullData, 8, 8, true); + + manager.appendSnapshot(chunk0, stateMachine); + Assertions.assertTrue(oldSnapshot.exists()); + Assertions.assertTrue(tmpSnapshot.exists()); + Assertions.assertFalse(newSnapshot.exists()); + + manager.appendSnapshot(chunk1, stateMachine); + Assertions.assertTrue(oldSnapshot.exists()); + Assertions.assertTrue(tmpSnapshot.exists()); + Assertions.assertFalse(newSnapshot.exists()); + + manager.finalizeSnapshot(chunk1); + Assertions.assertFalse(oldSnapshot.exists()); + Assertions.assertFalse(tmpSnapshot.exists()); + Assertions.assertArrayEquals(fullData, Files.readAllBytes(newSnapshot.toPath())); + } finally { + FileUtils.deleteFully(root); + } + } + + @Test + public void testFinalizeSnapshotRejectsIncompleteRequest() throws Exception { + final File root = Files.createTempDirectory("snapshot-manager-test-incomplete").toFile(); + try { + final SnapshotManager manager = new SnapshotManager( + RaftPeerId.valueOf("s2"), () -> new TestRaftStorageDirectory(root), EMPTY_STORAGE); + + final InstallSnapshotRequestProto incomplete = newSnapshotRequest( + "request-2", 0, false, + new File(RaftStorageDirectory.STATE_MACHINE_DIR_NAME, "f.snapshot").toString(), + "abc".getBytes(StandardCharsets.UTF_8), 0, 3, true); + final IOException ioe = Assertions.assertThrows(IOException.class, + () -> manager.finalizeSnapshot(incomplete)); + Assertions.assertTrue(ioe.getMessage().contains("Cannot finalize incomplete snapshot request")); + } finally { + FileUtils.deleteFully(root); + } + } +} From 23bfd92d02b20609b6a82ec9a1d06757e5bc6cd3 Mon Sep 17 00:00:00 2001 From: Abhishek Pal Date: Fri, 12 Jun 2026 10:26:41 +0530 Subject: [PATCH 3/3] Address review comments --- .../apache/ratis/server/impl/ServerState.java | 17 ---------- .../impl/SnapshotInstallationHandler.java | 13 ++++++-- .../ratis/server/storage/SnapshotManager.java | 5 +-- .../server/storage/TestSnapshotManager.java | 32 ++++--------------- 4 files changed, 19 insertions(+), 48 deletions(-) rename ratis-server/src/test/java/org/apache/ratis/server/storage/SnapshotManagerTest.java => ratis-test/src/test/java/org/apache/ratis/server/storage/TestSnapshotManager.java (87%) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 7fb3f47afd..ad3210d634 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -31,7 +31,6 @@ import org.apache.ratis.server.raftlog.memory.MemoryRaftLog; import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog; import org.apache.ratis.server.storage.*; -import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; @@ -69,7 +68,6 @@ class ServerState { private final MemoizedSupplier stateMachineUpdater; /** local storage for log and snapshot */ private final MemoizedCheckedSupplier raftStorage; - private final SnapshotManager snapshotManager; private final AtomicReference lastNoLeaderTime; private final TimeDuration noLeaderTimeout; @@ -118,9 +116,6 @@ class ServerState { this.raftStorage = MemoizedCheckedSupplier.valueOf( () -> StorageImplUtils.initRaftStorage(storageDirName, option, prop)); - this.snapshotManager = StorageImplUtils.newSnapshotManager(id, () -> getStorage().getStorageDir(), - stateMachine.getStateMachineStorage()); - // On start the leader is null, start the clock now this.lastNoLeaderTime = new AtomicReference<>(Timestamp.currentTime()); this.noLeaderTimeout = RaftServerConfigKeys.Notification.noLeaderTimeout(prop); @@ -470,18 +465,6 @@ RaftStorageImpl getStorage() { return raftStorage.getUnchecked(); } - void appendSnapshot(InstallSnapshotRequestProto request) throws IOException { - // TODO: verify that we need to install the snapshot - snapshotManager.appendSnapshot(request, server.getStateMachine()); - } - - void finalizeSnapshot(InstallSnapshotRequestProto request) throws IOException { - final StateMachine sm = server.getStateMachine(); - sm.pause(); // pause the SM right before publishing the snapshot atomically - // TODO: if there is a failure here, we need to rollback the snapshot installation. - snapshotManager.finalizeSnapshot(request); - } - private SnapshotInfo getLatestSnapshot() { return server.getStateMachine().getLatestSnapshot(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 57fce8adf8..64a65e0b1a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -35,7 +35,10 @@ import org.apache.ratis.server.protocol.RaftServerProtocol.Op; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.LogProtoUtils; +import org.apache.ratis.server.storage.SnapshotManager; +import org.apache.ratis.server.storage.StorageImplUtils; import org.apache.ratis.server.util.ServerStringUtils; +import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.BatchLogger; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.LifeCycle; @@ -70,6 +73,7 @@ private enum BatchLogKey implements BatchLogger.Key { private final RaftServerImpl server; private final ServerState state; + private final SnapshotManager snapshotManager; private final boolean installSnapshotEnabled; private final AtomicLong inProgressInstallSnapshotIndex = new AtomicLong(INVALID_LOG_INDEX); @@ -84,6 +88,9 @@ private enum BatchLogKey implements BatchLogger.Key { SnapshotInstallationHandler(RaftServerImpl server, RaftProperties properties) { this.server = server; this.state = server.getState(); + final StateMachine stateMachine = server.getStateMachine(); + this.snapshotManager = StorageImplUtils.newSnapshotManager(server.getId(), + () -> state.getStorage().getStorageDir(), stateMachine.getStateMachineStorage()); this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties); } @@ -220,12 +227,14 @@ private CompletableFuture checkAndInstallSnapshot(Ins + " (the expected index is " + expectedChunkIndex + ")"); } // Append chunks to a temporary location first. Publish only when done=true. - state.appendSnapshot(request); + final StateMachine stateMachine = server.getStateMachine(); + snapshotManager.appendSnapshot(request, stateMachine); nextChunkIndex.incrementAndGet(); // update the committed index // re-load the state machine if this is the last chunk if (snapshotChunkRequest.getDone()) { - state.finalizeSnapshot(request); + stateMachine.pause(); // pause the SM right before publishing the snapshot atomically + snapshotManager.finalizeSnapshot(request); state.reloadStateMachine(lastIncluded); chunk0CallId.set(-1); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java index d2d44e3d40..bf3c0ab9a4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java @@ -172,10 +172,7 @@ public void appendSnapshot(InstallSnapshotRequestProto request, StateMachine sta public void finalizeSnapshot(InstallSnapshotRequestProto request) throws IOException { final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); - if (!snapshotChunkRequest.getDone()) { - throw new IOException("Cannot finalize incomplete snapshot request: " - + ServerStringUtils.toInstallSnapshotRequestString(request)); - } + Preconditions.assertTrue(snapshotChunkRequest.getDone()); final File tmpDir = getSnapshotTmpDir(snapshotChunkRequest.getRequestId()); rename(tmpDir, snapshotDir.get()); } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/SnapshotManagerTest.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSnapshotManager.java similarity index 87% rename from ratis-server/src/test/java/org/apache/ratis/server/storage/SnapshotManagerTest.java rename to ratis-test/src/test/java/org/apache/ratis/server/storage/TestSnapshotManager.java index 593642698c..3b460632d4 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/storage/SnapshotManagerTest.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSnapshotManager.java @@ -27,11 +27,11 @@ import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.SizeInBytes; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.io.File; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.security.MessageDigest; @@ -39,25 +39,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class SnapshotManagerTest { - private static final class TestRaftStorageDirectory implements RaftStorageDirectory { - private final File root; - - private TestRaftStorageDirectory(File root) { - this.root = root; - } - - @Override - public File getRoot() { - return root; - } - - @Override - public boolean isHealthy() { - return true; - } - } - +public class TestSnapshotManager { private static final StateMachineStorage EMPTY_STORAGE = new StateMachineStorage() { @Override public void init(RaftStorage raftStorage) { @@ -113,8 +95,9 @@ private static byte[] md5(byte[] data) throws Exception { public void testAppendOnlyAndFinalizePublish() throws Exception { final File root = Files.createTempDirectory("snapshot-manager-test").toFile(); try { + final RaftStorageDirectoryImpl storageDir = new RaftStorageDirectoryImpl(root, SizeInBytes.ZERO); final SnapshotManager manager = new SnapshotManager( - RaftPeerId.valueOf("s1"), () -> new TestRaftStorageDirectory(root), EMPTY_STORAGE); + RaftPeerId.valueOf("s1"), () -> storageDir, EMPTY_STORAGE); final StateMachine stateMachine = mock(StateMachine.class); when(stateMachine.getLatestSnapshot()).thenReturn(null); @@ -157,16 +140,15 @@ public void testAppendOnlyAndFinalizePublish() throws Exception { public void testFinalizeSnapshotRejectsIncompleteRequest() throws Exception { final File root = Files.createTempDirectory("snapshot-manager-test-incomplete").toFile(); try { + final RaftStorageDirectoryImpl storageDir = new RaftStorageDirectoryImpl(root, SizeInBytes.ZERO); final SnapshotManager manager = new SnapshotManager( - RaftPeerId.valueOf("s2"), () -> new TestRaftStorageDirectory(root), EMPTY_STORAGE); + RaftPeerId.valueOf("s2"), () -> storageDir, EMPTY_STORAGE); final InstallSnapshotRequestProto incomplete = newSnapshotRequest( "request-2", 0, false, new File(RaftStorageDirectory.STATE_MACHINE_DIR_NAME, "f.snapshot").toString(), "abc".getBytes(StandardCharsets.UTF_8), 0, 3, true); - final IOException ioe = Assertions.assertThrows(IOException.class, - () -> manager.finalizeSnapshot(incomplete)); - Assertions.assertTrue(ioe.getMessage().contains("Cannot finalize incomplete snapshot request")); + Assertions.assertThrows(IllegalStateException.class, () -> manager.finalizeSnapshot(incomplete)); } finally { FileUtils.deleteFully(root); }