diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index bcf11baf7a..ad3210d634 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -31,7 +31,6 @@ import org.apache.ratis.server.raftlog.memory.MemoryRaftLog; import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog; import org.apache.ratis.server.storage.*; -import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; @@ -69,7 +68,6 @@ class ServerState { private final MemoizedSupplier stateMachineUpdater; /** local storage for log and snapshot */ private final MemoizedCheckedSupplier raftStorage; - private final SnapshotManager snapshotManager; private final AtomicReference lastNoLeaderTime; private final TimeDuration noLeaderTimeout; @@ -118,9 +116,6 @@ class ServerState { this.raftStorage = MemoizedCheckedSupplier.valueOf( () -> StorageImplUtils.initRaftStorage(storageDirName, option, prop)); - this.snapshotManager = StorageImplUtils.newSnapshotManager(id, () -> getStorage().getStorageDir(), - stateMachine.getStateMachineStorage()); - // On start the leader is null, start the clock now this.lastNoLeaderTime = new AtomicReference<>(Timestamp.currentTime()); this.noLeaderTimeout = RaftServerConfigKeys.Notification.noLeaderTimeout(prop); @@ -470,13 +465,6 @@ RaftStorageImpl getStorage() { return raftStorage.getUnchecked(); } - void installSnapshot(InstallSnapshotRequestProto request) throws IOException { - // TODO: verify that we need to install the snapshot - StateMachine sm = server.getStateMachine(); - sm.pause(); // pause the SM to prepare for install snapshot - snapshotManager.installSnapshot(request, sm); - } - private SnapshotInfo getLatestSnapshot() { return server.getStateMachine().getLatestSnapshot(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java index 46b6aaf87f..64a65e0b1a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java @@ -35,7 +35,10 @@ import org.apache.ratis.server.protocol.RaftServerProtocol.Op; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.LogProtoUtils; +import org.apache.ratis.server.storage.SnapshotManager; +import org.apache.ratis.server.storage.StorageImplUtils; import org.apache.ratis.server.util.ServerStringUtils; +import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.BatchLogger; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.LifeCycle; @@ -70,6 +73,7 @@ private enum BatchLogKey implements BatchLogger.Key { private final RaftServerImpl server; private final ServerState state; + private final SnapshotManager snapshotManager; private final boolean installSnapshotEnabled; private final AtomicLong inProgressInstallSnapshotIndex = new AtomicLong(INVALID_LOG_INDEX); @@ -84,6 +88,9 @@ private enum BatchLogKey implements BatchLogger.Key { SnapshotInstallationHandler(RaftServerImpl server, RaftProperties properties) { this.server = server; this.state = server.getState(); + final StateMachine stateMachine = server.getStateMachine(); + this.snapshotManager = StorageImplUtils.newSnapshotManager(server.getId(), + () -> state.getStorage().getStorageDir(), stateMachine.getStateMachineStorage()); this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties); } @@ -214,17 +221,20 @@ private CompletableFuture checkAndInstallSnapshot(Ins return future.thenApply(dummy -> reply); } - //TODO: We should only update State with installed snapshot once the request is done. - state.installSnapshot(request); - - final int expectedChunkIndex = nextChunkIndex.getAndIncrement(); + final int expectedChunkIndex = nextChunkIndex.get(); if (expectedChunkIndex != snapshotChunkRequest.getRequestIndex()) { throw new IOException("Unexpected request chunk index: " + snapshotChunkRequest.getRequestIndex() + " (the expected index is " + expectedChunkIndex + ")"); } + // Append chunks to a temporary location first. Publish only when done=true. + final StateMachine stateMachine = server.getStateMachine(); + snapshotManager.appendSnapshot(request, stateMachine); + nextChunkIndex.incrementAndGet(); // update the committed index // re-load the state machine if this is the last chunk if (snapshotChunkRequest.getDone()) { + stateMachine.pause(); // pause the SM right before publishing the snapshot atomically + snapshotManager.finalizeSnapshot(request); state.reloadStateMachine(lastIncluded); chunk0CallId.set(-1); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java index a96001b598..bf3c0ab9a4 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java @@ -99,12 +99,16 @@ private FileChannel open(FileChunkProto chunk, File tmpSnapshotFile) throws IOEx return out; } - public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine) throws IOException { + private File getSnapshotTmpDir(String requestId) { + return new File(this.snapshotTmpDir.get(), "snapshot-" + requestId); + } + + public void appendSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine) throws IOException { final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex(); // create a unique temporary directory - final File tmpDir = new File(this.snapshotTmpDir.get(), "snapshot-" + snapshotChunkRequest.getRequestId()); + final File tmpDir = getSnapshotTmpDir(snapshotChunkRequest.getRequestId()); FileUtils.createDirectories(tmpDir); tmpDir.deleteOnExit(); @@ -164,9 +168,13 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st } } - if (snapshotChunkRequest.getDone()) { - rename(tmpDir, snapshotDir.get()); - } + } + + public void finalizeSnapshot(InstallSnapshotRequestProto request) throws IOException { + final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk(); + Preconditions.assertTrue(snapshotChunkRequest.getDone()); + final File tmpDir = getSnapshotTmpDir(snapshotChunkRequest.getRequestId()); + rename(tmpDir, snapshotDir.get()); } private static void rename(File tmpDir, File stateMachineDir) throws IOException { diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSnapshotManager.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSnapshotManager.java new file mode 100644 index 0000000000..3b460632d4 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestSnapshotManager.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.storage; + +import org.apache.ratis.proto.RaftProtos.FileChunkProto; +import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.statemachine.SnapshotRetentionPolicy; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.StateMachineStorage; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.SizeInBytes; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.security.MessageDigest; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestSnapshotManager { + private static final StateMachineStorage EMPTY_STORAGE = new StateMachineStorage() { + @Override + public void init(RaftStorage raftStorage) { + } + + @Override + public SnapshotInfo getLatestSnapshot() { + return null; + } + + @Override + public void format() { + } + + @Override + public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) { + } + }; + + private static InstallSnapshotRequestProto newSnapshotRequest( + String requestId, int requestIndex, boolean requestDone, String filename, byte[] fullData, + int offset, int chunkSize, boolean chunkDone) throws Exception { + final FileChunkProto fileChunk = FileChunkProto.newBuilder() + .setFilename(filename) + .setTotalSize(fullData.length) + .setFileDigest(ByteString.copyFrom(md5(fullData))) + .setChunkIndex(requestIndex) + .setOffset(offset) + .setData(ByteString.copyFrom(fullData, offset, chunkSize)) + .setDone(chunkDone) + .build(); + + final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunk = + InstallSnapshotRequestProto.SnapshotChunkProto.newBuilder() + .setRequestId(requestId) + .setRequestIndex(requestIndex) + .setTermIndex(TermIndex.valueOf(1L, 10L).toProto()) + .addFileChunks(fileChunk) + .setTotalSize(fullData.length) + .setDone(requestDone) + .build(); + + return InstallSnapshotRequestProto.newBuilder() + .setSnapshotChunk(snapshotChunk) + .build(); + } + + private static byte[] md5(byte[] data) throws Exception { + return MessageDigest.getInstance("MD5").digest(data); + } + + @Test + public void testAppendOnlyAndFinalizePublish() throws Exception { + final File root = Files.createTempDirectory("snapshot-manager-test").toFile(); + try { + final RaftStorageDirectoryImpl storageDir = new RaftStorageDirectoryImpl(root, SizeInBytes.ZERO); + final SnapshotManager manager = new SnapshotManager( + RaftPeerId.valueOf("s1"), () -> storageDir, EMPTY_STORAGE); + final StateMachine stateMachine = mock(StateMachine.class); + when(stateMachine.getLatestSnapshot()).thenReturn(null); + + final File stateMachineDir = new File(root, RaftStorageDirectory.STATE_MACHINE_DIR_NAME); + FileUtils.createDirectories(stateMachineDir); + final File oldSnapshot = new File(stateMachineDir, "old.snapshot"); + Files.write(oldSnapshot.toPath(), "old".getBytes(StandardCharsets.UTF_8)); + + final byte[] fullData = "0123456789abcdef".getBytes(StandardCharsets.UTF_8); + final File newSnapshot = new File(stateMachineDir, "new.snapshot"); + final String filename = new File( + RaftStorageDirectory.STATE_MACHINE_DIR_NAME, "new.snapshot").toString(); + final File tmpSnapshot = new File(new File( + new File(root, RaftStorageDirectory.TMP_DIR_NAME), "snapshot-request-1"), "new.snapshot"); + final InstallSnapshotRequestProto chunk0 = newSnapshotRequest( + "request-1", 0, false, filename, fullData, 0, 8, false); + final InstallSnapshotRequestProto chunk1 = newSnapshotRequest( + "request-1", 1, true, filename, fullData, 8, 8, true); + + manager.appendSnapshot(chunk0, stateMachine); + Assertions.assertTrue(oldSnapshot.exists()); + Assertions.assertTrue(tmpSnapshot.exists()); + Assertions.assertFalse(newSnapshot.exists()); + + manager.appendSnapshot(chunk1, stateMachine); + Assertions.assertTrue(oldSnapshot.exists()); + Assertions.assertTrue(tmpSnapshot.exists()); + Assertions.assertFalse(newSnapshot.exists()); + + manager.finalizeSnapshot(chunk1); + Assertions.assertFalse(oldSnapshot.exists()); + Assertions.assertFalse(tmpSnapshot.exists()); + Assertions.assertArrayEquals(fullData, Files.readAllBytes(newSnapshot.toPath())); + } finally { + FileUtils.deleteFully(root); + } + } + + @Test + public void testFinalizeSnapshotRejectsIncompleteRequest() throws Exception { + final File root = Files.createTempDirectory("snapshot-manager-test-incomplete").toFile(); + try { + final RaftStorageDirectoryImpl storageDir = new RaftStorageDirectoryImpl(root, SizeInBytes.ZERO); + final SnapshotManager manager = new SnapshotManager( + RaftPeerId.valueOf("s2"), () -> storageDir, EMPTY_STORAGE); + + final InstallSnapshotRequestProto incomplete = newSnapshotRequest( + "request-2", 0, false, + new File(RaftStorageDirectory.STATE_MACHINE_DIR_NAME, "f.snapshot").toString(), + "abc".getBytes(StandardCharsets.UTF_8), 0, 3, true); + Assertions.assertThrows(IllegalStateException.class, () -> manager.finalizeSnapshot(incomplete)); + } finally { + FileUtils.deleteFully(root); + } + } +}