Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.ratis.server.raftlog.memory.MemoryRaftLog;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
import org.apache.ratis.server.storage.*;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
Expand Down Expand Up @@ -69,7 +68,6 @@ class ServerState {
private final MemoizedSupplier<StateMachineUpdater> stateMachineUpdater;
/** local storage for log and snapshot */
private final MemoizedCheckedSupplier<RaftStorageImpl, IOException> raftStorage;
private final SnapshotManager snapshotManager;
private final AtomicReference<Timestamp> lastNoLeaderTime;
private final TimeDuration noLeaderTimeout;

Expand Down Expand Up @@ -118,9 +116,6 @@ class ServerState {
this.raftStorage = MemoizedCheckedSupplier.valueOf(
() -> StorageImplUtils.initRaftStorage(storageDirName, option, prop));

this.snapshotManager = StorageImplUtils.newSnapshotManager(id, () -> getStorage().getStorageDir(),
stateMachine.getStateMachineStorage());

// On start the leader is null, start the clock now
this.lastNoLeaderTime = new AtomicReference<>(Timestamp.currentTime());
this.noLeaderTimeout = RaftServerConfigKeys.Notification.noLeaderTimeout(prop);
Expand Down Expand Up @@ -470,13 +465,6 @@ RaftStorageImpl getStorage() {
return raftStorage.getUnchecked();
}

void installSnapshot(InstallSnapshotRequestProto request) throws IOException {
// TODO: verify that we need to install the snapshot
StateMachine sm = server.getStateMachine();
sm.pause(); // pause the SM to prepare for install snapshot
snapshotManager.installSnapshot(request, sm);
}

private SnapshotInfo getLatestSnapshot() {
return server.getStateMachine().getLatestSnapshot();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
import org.apache.ratis.server.protocol.RaftServerProtocol.Op;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.storage.SnapshotManager;
import org.apache.ratis.server.storage.StorageImplUtils;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.BatchLogger;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.LifeCycle;
Expand Down Expand Up @@ -70,6 +73,7 @@ private enum BatchLogKey implements BatchLogger.Key {

private final RaftServerImpl server;
private final ServerState state;
private final SnapshotManager snapshotManager;

private final boolean installSnapshotEnabled;
private final AtomicLong inProgressInstallSnapshotIndex = new AtomicLong(INVALID_LOG_INDEX);
Expand All @@ -84,6 +88,9 @@ private enum BatchLogKey implements BatchLogger.Key {
SnapshotInstallationHandler(RaftServerImpl server, RaftProperties properties) {
this.server = server;
this.state = server.getState();
final StateMachine stateMachine = server.getStateMachine();
this.snapshotManager = StorageImplUtils.newSnapshotManager(server.getId(),
() -> state.getStorage().getStorageDir(), stateMachine.getStateMachineStorage());
this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
}

Expand Down Expand Up @@ -214,17 +221,20 @@ private CompletableFuture<InstallSnapshotReplyProto> checkAndInstallSnapshot(Ins
return future.thenApply(dummy -> reply);
}

//TODO: We should only update State with installed snapshot once the request is done.
state.installSnapshot(request);

final int expectedChunkIndex = nextChunkIndex.getAndIncrement();
final int expectedChunkIndex = nextChunkIndex.get();
if (expectedChunkIndex != snapshotChunkRequest.getRequestIndex()) {
throw new IOException("Unexpected request chunk index: " + snapshotChunkRequest.getRequestIndex()
+ " (the expected index is " + expectedChunkIndex + ")");
}
// Append chunks to a temporary location first. Publish only when done=true.
final StateMachine stateMachine = server.getStateMachine();
snapshotManager.appendSnapshot(request, stateMachine);
nextChunkIndex.incrementAndGet();
// update the committed index
// re-load the state machine if this is the last chunk
if (snapshotChunkRequest.getDone()) {
stateMachine.pause(); // pause the SM right before publishing the snapshot atomically
snapshotManager.finalizeSnapshot(request);
state.reloadStateMachine(lastIncluded);
chunk0CallId.set(-1);
Comment on lines +224 to 239

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move snapshotManager to SnapshotInstallationHandler. It is only used here.

        final int expectedChunkIndex = nextChunkIndex.get();
        if (expectedChunkIndex != snapshotChunkRequest.getRequestIndex()) {
          throw new IOException("Unexpected request chunk index: " + snapshotChunkRequest.getRequestIndex()
              + " (the expected index is " + expectedChunkIndex + ")");
        }
        // Append chunks to a temporary location first. Publish only when done=true.
        final StateMachine stateMachine = server.getStateMachine();
        snapshotManager.appendSnapshot(request, stateMachine);
        nextChunkIndex.incrementAndGet();
        // update the committed index
        // re-load the state machine if this is the last chunk
        if (snapshotChunkRequest.getDone()) {
          stateMachine.pause(); // pause the SM right before publishing the snapshot atomically
          snapshotManager.finalizeSnapshot(request);

          state.reloadStateMachine(lastIncluded);
          chunk0CallId.set(-1);
        }

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,16 @@ private FileChannel open(FileChunkProto chunk, File tmpSnapshotFile) throws IOEx
return out;
}

public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine) throws IOException {
private File getSnapshotTmpDir(String requestId) {
return new File(this.snapshotTmpDir.get(), "snapshot-" + requestId);
}

public void appendSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine) throws IOException {
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();

// create a unique temporary directory
final File tmpDir = new File(this.snapshotTmpDir.get(), "snapshot-" + snapshotChunkRequest.getRequestId());
final File tmpDir = getSnapshotTmpDir(snapshotChunkRequest.getRequestId());
FileUtils.createDirectories(tmpDir);
tmpDir.deleteOnExit();

Expand Down Expand Up @@ -164,9 +168,13 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
}
}

if (snapshotChunkRequest.getDone()) {
rename(tmpDir, snapshotDir.get());
}
}

public void finalizeSnapshot(InstallSnapshotRequestProto request) throws IOException {
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
Preconditions.assertTrue(snapshotChunkRequest.getDone());
final File tmpDir = getSnapshotTmpDir(snapshotChunkRequest.getRequestId());
rename(tmpDir, snapshotDir.get());
}

private static void rename(File tmpDir, File stateMachineDir) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.storage;

import org.apache.ratis.proto.RaftProtos.FileChunkProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.SizeInBytes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.security.MessageDigest;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestSnapshotManager {
private static final StateMachineStorage EMPTY_STORAGE = new StateMachineStorage() {
@Override
public void init(RaftStorage raftStorage) {
}

@Override
public SnapshotInfo getLatestSnapshot() {
return null;
}

@Override
public void format() {
}

@Override
public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) {
}
};

private static InstallSnapshotRequestProto newSnapshotRequest(
String requestId, int requestIndex, boolean requestDone, String filename, byte[] fullData,
int offset, int chunkSize, boolean chunkDone) throws Exception {
final FileChunkProto fileChunk = FileChunkProto.newBuilder()
.setFilename(filename)
.setTotalSize(fullData.length)
.setFileDigest(ByteString.copyFrom(md5(fullData)))
.setChunkIndex(requestIndex)
.setOffset(offset)
.setData(ByteString.copyFrom(fullData, offset, chunkSize))
.setDone(chunkDone)
.build();

final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunk =
InstallSnapshotRequestProto.SnapshotChunkProto.newBuilder()
.setRequestId(requestId)
.setRequestIndex(requestIndex)
.setTermIndex(TermIndex.valueOf(1L, 10L).toProto())
.addFileChunks(fileChunk)
.setTotalSize(fullData.length)
.setDone(requestDone)
.build();

return InstallSnapshotRequestProto.newBuilder()
.setSnapshotChunk(snapshotChunk)
.build();
}

private static byte[] md5(byte[] data) throws Exception {
return MessageDigest.getInstance("MD5").digest(data);
}

@Test
public void testAppendOnlyAndFinalizePublish() throws Exception {
final File root = Files.createTempDirectory("snapshot-manager-test").toFile();
try {
final RaftStorageDirectoryImpl storageDir = new RaftStorageDirectoryImpl(root, SizeInBytes.ZERO);
final SnapshotManager manager = new SnapshotManager(
RaftPeerId.valueOf("s1"), () -> storageDir, EMPTY_STORAGE);
final StateMachine stateMachine = mock(StateMachine.class);
when(stateMachine.getLatestSnapshot()).thenReturn(null);

final File stateMachineDir = new File(root, RaftStorageDirectory.STATE_MACHINE_DIR_NAME);
FileUtils.createDirectories(stateMachineDir);
final File oldSnapshot = new File(stateMachineDir, "old.snapshot");
Files.write(oldSnapshot.toPath(), "old".getBytes(StandardCharsets.UTF_8));

final byte[] fullData = "0123456789abcdef".getBytes(StandardCharsets.UTF_8);
final File newSnapshot = new File(stateMachineDir, "new.snapshot");
final String filename = new File(
RaftStorageDirectory.STATE_MACHINE_DIR_NAME, "new.snapshot").toString();
final File tmpSnapshot = new File(new File(
new File(root, RaftStorageDirectory.TMP_DIR_NAME), "snapshot-request-1"), "new.snapshot");
final InstallSnapshotRequestProto chunk0 = newSnapshotRequest(
"request-1", 0, false, filename, fullData, 0, 8, false);
final InstallSnapshotRequestProto chunk1 = newSnapshotRequest(
"request-1", 1, true, filename, fullData, 8, 8, true);

manager.appendSnapshot(chunk0, stateMachine);
Assertions.assertTrue(oldSnapshot.exists());
Assertions.assertTrue(tmpSnapshot.exists());
Assertions.assertFalse(newSnapshot.exists());

manager.appendSnapshot(chunk1, stateMachine);
Assertions.assertTrue(oldSnapshot.exists());
Assertions.assertTrue(tmpSnapshot.exists());
Assertions.assertFalse(newSnapshot.exists());

manager.finalizeSnapshot(chunk1);
Assertions.assertFalse(oldSnapshot.exists());
Assertions.assertFalse(tmpSnapshot.exists());
Assertions.assertArrayEquals(fullData, Files.readAllBytes(newSnapshot.toPath()));
} finally {
FileUtils.deleteFully(root);
}
}

@Test
public void testFinalizeSnapshotRejectsIncompleteRequest() throws Exception {
final File root = Files.createTempDirectory("snapshot-manager-test-incomplete").toFile();
try {
final RaftStorageDirectoryImpl storageDir = new RaftStorageDirectoryImpl(root, SizeInBytes.ZERO);
final SnapshotManager manager = new SnapshotManager(
RaftPeerId.valueOf("s2"), () -> storageDir, EMPTY_STORAGE);

final InstallSnapshotRequestProto incomplete = newSnapshotRequest(
"request-2", 0, false,
new File(RaftStorageDirectory.STATE_MACHINE_DIR_NAME, "f.snapshot").toString(),
"abc".getBytes(StandardCharsets.UTF_8), 0, 3, true);
Assertions.assertThrows(IllegalStateException.class, () -> manager.finalizeSnapshot(incomplete));
} finally {
FileUtils.deleteFully(root);
}
}
}