[Raft] Add Raft persistence#1470
Conversation
166d5d0 to
5759374
Compare
| return stacktrace.Propagate(err, "failed to save snapshot to wal") | ||
| } | ||
|
|
||
| return s.wal.ReleaseLockTo(snapshot.Metadata.Index) |
There was a problem hiding this comment.
Please add a comment on where the lock was acquired
There was a problem hiding this comment.
+1
And does the lock need to be released when an error is returned?
There was a problem hiding this comment.
These are os level flock locks on WAL segment files that are acquired internally in the WAL implementation. ReleaseLockTo is used to release the locks for the entries that are already covered by the snapshot and, thus, the files are not needed anymore.
We shouldn't release the locks when an error is returned since the files are still needed if we fail to save the snapshot.
I realized, while looking into etcd's source code, that this call is actually useless in the current state of our implementation. Etcd uses this as a way to mark files that can be cleaned up (separately). I will keep it for now and add a comment to the function and the github issue to implement the cleanup later.
There was a problem hiding this comment.
Please also include this rationale in the comment in code :)
|
Thank you @MariemBaccari, please find some suggestions inline. |
| func init() { | ||
| flag.Uint64Var(&connectParameters.ID, "raft_node_id", 0, "raft node ID for this instance (must be non-zero and unique within the cluster)") | ||
| flag.StringVar(&connectParameters.Peers, "raft_peers", "", `comma-separated "nodeID=peerURL" pairs for all cluster members, including the current node, e.g. "1=http://node1:9021,2=http://node2:9021,3=http://node3:9021"`) | ||
| flag.StringVar(&connectParameters.DataDir, "raft_data_directory", defaultDataDir, "directory for raft data (snapshot and WAL storage)") |
There was a problem hiding this comment.
Whenever we write stuff to the local filesystem that becomes a something the operations need to deal with. So somehow, somewhere, we need documentation about how to handle the data in that directory. Is it temp data that can be trashed? If trashed how long does it take to reconstruct it? If not temp, does it need to be backed up? What happens if it is lost? How large can it become? Can it be hot swapped? Is there any impact from the underlying storage (e.g. if it is too slow?) etc.
There was a problem hiding this comment.
e.g. I suspect snapshotCatchUpEntriesN has an impact on the size of file on filesytem: have this configurable and document it
There was a problem hiding this comment.
I clarified the raft_datadir flag description, do you think that's sufficient for the moment ? I also added a cleanup task in #1463. I think the implementation of that will come with more documentation regarding the growth of the data directory.
There was a problem hiding this comment.
I clarified the
raft_datadirflag description, do you think that's sufficient for the moment ? I also added a cleanup task in #1463. I think the implementation of that will come with more documentation regarding the growth of the data directory.
This should be part of the user documentation - so not in the code necessarily (although code comment is useful too). I'd say as long as it is tracked that it is to be included in the documentation we are OK. I can't find on #1463 though?
There was a problem hiding this comment.
I had added the "Old files cleanup" task to "future changes". I just specified the user documentation part now.
62208b4 to
610ad5f
Compare
mickmis
left a comment
There was a problem hiding this comment.
Not sure if already suggested by @barroco, but using https://pkg.go.dev/go.etcd.io/raft/v3@v3.6.0/rafttest for testing would be quite important (not necessarily in this PR, but important to do in general)
| return nil, false, stacktrace.Propagate(err, "failed to create directory for wal storage at: %s", walPath) | ||
| } | ||
|
|
||
| w, err := wal.Create(logger, walPath, nil) |
There was a problem hiding this comment.
w is redefined here. If that is intended, why declaring var w above?
There was a problem hiding this comment.
Thanks for catching this, I did not intend to redefine it.
| return stacktrace.Propagate(err, "failed to save snapshot to wal") | ||
| } | ||
|
|
||
| return s.wal.ReleaseLockTo(snapshot.Metadata.Index) |
There was a problem hiding this comment.
Please also include this rationale in the comment in code :)
| } | ||
|
|
||
| func loadSnapshot(logger *zap.Logger, walPath string, snapshotter *snap.Snapshotter) (*raftpb.Snapshot, error) { | ||
| if !wal.Exist(walPath) { |
There was a problem hiding this comment.
This check is done again right after the only call to loadSnapshot: looks like the control flow could be clarified a bit. Is the intent to load a snapshot only if the wal does not exist? If so, what about removing this here and calling loadSnapshot in an else of if !ok?
There was a problem hiding this comment.
The snapshot is loaded only if the wal does already exist but yes it's better to move the load in an else, the Exist call is redundant. Will fix this.
| } | ||
|
|
||
| // getSnapshot calls all registered snapshot providers and combines their data into a single snapshot. | ||
| func (s *storage) getSnapshot() ([]byte, error) { |
There was a problem hiding this comment.
Looks like this fits json.Marshaler interface: https://pkg.go.dev/encoding/json#Marshaler
There was a problem hiding this comment.
That's true but I think we should keep this method signature as it clearly indicates the specific purpose of getting a snapshot of the storage. The underlying implementation can always change and just happens to be a json marshalling for the moment.
| return stacktrace.Propagate(err, "failed to save WAL entries") | ||
| } | ||
|
|
||
| if !raft.IsEmptySnap(snapshot) { |
There was a problem hiding this comment.
Control flow: is it actually necessary to check for this condition twice?
There was a problem hiding this comment.
Since we cannot change the ordering of saving the snapshot -> saving the entries -> applying, I think we have to check for this condition twice.
| // newStorage initializes the storage by loading the latest snapshot and wal entries from the disk | ||
| // and applies them to the Raft memory storage. | ||
| // It returns the initialized storage, a boolean indicating whether the storage was pre-existent or an error. | ||
| func newStorage(ctx context.Context, logger *zap.Logger, dataDir string, nodeID uint64, snapshotCatchUpEntries uint64) (*storage, bool, error) { |
There was a problem hiding this comment.
And an additional clarification: is this storage OK in a cluster environment where there are multiple DSS pods? Is this the intent of the nodeID or not at all?
There was a problem hiding this comment.
Yes, that is the intent of nodeID :) I can run multiple nodes locally safely.
610ad5f to
78db271
Compare
78db271 to
090d9a6
Compare
This PR is part of the chain #1470 -> #1473 -> #1474 to address the issue #1463.
It adds WAL and snapshot persistency and management for the raftstore. Contrary to the diagrams from the issue #1463, we embed
*Raft.MemoryStoragein thestorageinstead of setting it as a field inConsensusto avoid scattering the storage management.