Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .claude/settings.local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"permissions": {
"allow": [
"Bash(go vet *)",
"Bash(go test *)",
"Bash(go build *)",
"Bash(python3 -c ' *)"
]
}
}
41 changes: 40 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: CI

on:
push:
branches: [main]
branches: [main, "feature/**"]
pull_request:
branches: [main]

Expand All @@ -16,3 +16,42 @@ jobs:
go-version: "1.23"
- run: go vet ./...
- run: go build ./...

integration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: actions/setup-go@v5
with:
go-version: "1.23"

- name: Build server
run: go build -o bin/ban-server ./Server

- name: Build benchmark
run: go build -o bin/ban-bench ./benchmark

- name: Start server
run: ./bin/ban-server &
env:
GOMEMLIMIT: 512MiB

- name: Wait for server ready
run: |
for i in $(seq 1 30); do
if nc -z localhost 8080 2>/dev/null; then
echo "Server ready after ${i}s"
exit 0
fi
sleep 1
done
echo "Server failed to start"
exit 1

- name: Run benchmark
run: ./bin/ban-bench -d 5s -c 4 -n 1000 -mode mixed

- name: Verify server alive
run: |
nc -z localhost 8080 && echo "Server still healthy"
30 changes: 30 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# 协作规范

## 1. 方案先行
处理任何问题时,先提出**多种可选方案**让我选择,不要直接动手。
- 列出每种方案的思路、优缺点、影响范围。
- 如果你有更优方案,明确标注并说明理由。
- 我选定方案后,再开始实现。

## 2. 外科手术式修改
每次修改只能精确到目标,**不允许顺带改动**。
- 不修改与当前任务无关的代码(即使你认为"顺手优化")。
- 不重命名、不重排、不格式化无关行。
- 不引入额外的抽象、注释、依赖。
- 若发现其它问题,单独提出,不要混入当前改动。

## 3. 原子性 commit
每完成一个独立部分立即 commit,**无需询问**。
- 一个 commit 只包含一件事,不要把多个改动合并提交。
- commit message 用一句话总结本次改动的内容与目的。
- 改动完一部分就立刻提交,再继续下一部分。
- 自动执行 commit,不要等待我的确认。

## 4. 自动 PR + rebase merge
每次 commit 并 push 后,**自动执行以下流程,无需询问**:
- 若该分支尚未有 PR,立即 `gh pr create` 创建。
- 若已有 PR,不再重复创建。
- 创建 PR 后,使用 `gh pr merge --rebase --auto` 开启 GitHub 自动合并。
- GitHub 会在 **CI 全部通过** 后自动 rebase merge 到 main。
- **CI 不通过绝不 merge**,必须先修复问题重新 commit。
- 如果 merge 冲突,停止并告知我需要手动解决。
77 changes: 54 additions & 23 deletions Raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package Raft
import (
"encoding/binary"
"fmt"
"log/slog"
"math/rand"
"sync"
"time"
Expand Down Expand Up @@ -95,7 +96,7 @@ func NewRaft(peers []string, me int) *Raft {

// 从磁盘加载持久化状态(currentTerm, votedFor, log, snapshot metadata)
if err := r.readPersist(); err != nil {
fmt.Printf("[RAFT WARN] Failed to load persisted state: %v\n", err)
slog.Warn("failed to load persisted state", "error", err)
}

// 如果有快照,通知 FSM
Expand All @@ -110,7 +111,7 @@ func NewRaft(peers []string, me int) *Raft {
IsSnapshot: true,
}:
default:
fmt.Println("[WARN] ApplyCh is full during initialization, snapshot skipped")
slog.Warn("ApplyCh full during init, snapshot skipped")
}
}
}
Expand All @@ -122,7 +123,14 @@ func NewRaft(peers []string, me int) *Raft {
return r
}

// persistLocked 持久化 Raft 状态(必须在持有锁的情况下调用)
// persistStateLocked 仅持久化 Term 和 votedFor(增量持久化,O(1))
func (r *Raft) persistStateLocked() {
if err := r.wal.SaveState(int64(r.Term), int64(r.votedFor)); err != nil {
slog.Error("failed to persist state", "error", err)
}
}

// persistLocked 全量持久化 Raft 状态(仅用于日志冲突截断等特殊情况)
func (r *Raft) persistLocked() {
data := PersistData{
CurrentTerm: int64(r.Term),
Expand All @@ -133,7 +141,7 @@ func (r *Raft) persistLocked() {
}

if err := r.wal.SavePersist(data); err != nil {
fmt.Printf("[RAFT ERROR] Failed to persist state: %v\n", err)
slog.Error("failed to persist state", "error", err)
}
}

Expand Down Expand Up @@ -188,18 +196,21 @@ func (r *Raft) startElection() {
return
}

fmt.Printf("[RAFT] Starting election, current state=%v, Term=%d\n", r.state, r.Term)
slog.Info("starting election", "state", r.state, "term", r.Term)

r.state = Candidate
r.Term++
r.votedFor = r.me
r.persistLocked() // 持久化 Term 和 votedFor
r.persistStateLocked()

lastLogIndex := int(r.LastIncludedIndex)
lastLogTerm := int(r.LastIncludedTerm)
lastLogIndex := -1
lastLogTerm := 0
if len(r.log) > 0 {
lastLogIndex = r.log[len(r.log)-1].Index
lastLogTerm = r.log[len(r.log)-1].Term
} else if r.LastIncludedIndex > 0 {
lastLogIndex = int(r.LastIncludedIndex)
lastLogTerm = int(r.LastIncludedTerm)
}

args := &RequestVoteArgs{
Expand Down Expand Up @@ -247,6 +258,16 @@ func (r *Raft) startElection() {

r.mu.Unlock()

// 单节点模式:无需等待投票,直接成为 Leader
if peerCount == 0 {
r.mu.Lock()
if r.state == Candidate {
r.becomeLeader()
}
r.mu.Unlock()
return
}

// 等待投票结果或超时
timeout := time.After(500 * time.Millisecond)
for j := 0; j < peerCount; j++ {
Expand Down Expand Up @@ -278,21 +299,23 @@ func (r *Raft) startElection() {
}

func (r *Raft) becomeLeader() {
fmt.Printf("[RAFT] Becoming Leader, Term=%d\n", r.Term)
slog.Info("becoming leader", "term", r.Term)
r.state = Leader

// 计算下一个日志的绝对索引(考虑快照偏移)
nextLogIndex := int(r.LastIncludedIndex) + 1
nextLogIndex := 0
if len(r.log) > 0 {
nextLogIndex = r.log[len(r.log)-1].Index + 1
} else if r.LastIncludedIndex > 0 {
nextLogIndex = int(r.LastIncludedIndex) + 1
}

for i := range r.peers {
r.nextIndex[i] = nextLogIndex
r.matchIndex[i] = int(r.LastIncludedIndex)
}

fmt.Printf("[RAFT] Started heartbeat loop\n")
slog.Debug("heartbeat loop started")
r.startHeartbeatLoop()
}

Expand Down Expand Up @@ -472,10 +495,9 @@ func (r *Raft) checkSnapshotTrigger() {
if logLength > threshold {
snapshotIndex := r.commitIndex - keepEntries
if snapshotIndex > r.lastSnapshotIndex {
fmt.Printf("[RAFT] Auto-triggering snapshot at index %d (log length=%d, threshold=%d)\n",
snapshotIndex, logLength, threshold)
// 异步调用避免持锁死锁(checkSnapshotTrigger 在持锁上下文中被调用)
go r.TakeSnapshot(snapshotIndex)
slog.Info("auto-triggering snapshot", "index", snapshotIndex, "logLen", logLength, "threshold", threshold)
// 异步调用避免持锁死锁(checkSnapshotTrigger 在持锁上下文中被调用)
go r.TakeSnapshot(snapshotIndex)
}
}
}
Expand All @@ -500,22 +522,27 @@ func (r *Raft) getLastLogIndex() int {
if len(r.log) > 0 {
return r.log[len(r.log)-1].Index
}
return int(r.LastIncludedIndex)
if r.LastIncludedIndex > 0 {
return int(r.LastIncludedIndex)
}
return -1
}

func (r *Raft) AppendEntry(command []byte) (int, error) {
r.mu.Lock()
defer r.mu.Unlock()

if r.state != Leader {
fmt.Printf("[RAFT] AppendEntry failed: not leader, state=%v\n", r.state)
slog.Warn("AppendEntry rejected, not leader", "state", r.state)
return -1, fmt.Errorf("not leader")
}

// 计算绝对索引(考虑快照偏移)
lastLogIndex := int(r.LastIncludedIndex)
lastLogIndex := -1
if len(r.log) > 0 {
lastLogIndex = r.log[len(r.log)-1].Index
} else if r.LastIncludedIndex > 0 {
lastLogIndex = int(r.LastIncludedIndex)
}

entry := LogEntry{
Expand All @@ -524,7 +551,11 @@ func (r *Raft) AppendEntry(command []byte) (int, error) {
Command: command,
}
r.log = append(r.log, entry)
r.persistLocked()

// 增量持久化:仅追加一条日志
if err := r.wal.AppendLog(entry); err != nil {
slog.Error("failed to append log", "error", err)
}

// 单节点模式:立即提交
if len(r.peers) == 1 {
Expand Down Expand Up @@ -736,14 +767,14 @@ func (r *Raft) TakeSnapshot(index int) error {
}
select {
case r.ApplyCh <- snapshotEntry:
fmt.Printf("[RAFT] Snapshot replay sent to FSM: Index=%d, entries=%d\n", index, len(snapshotEntries))
slog.Info("snapshot replay sent to FSM", "index", index, "entries", len(snapshotEntries))
default:
fmt.Println("[WARN] ApplyCh is full, snapshot replay skipped")
slog.Warn("ApplyCh full, snapshot replay skipped")
}
}

// 7. 持久化状态
r.persistLocked()
// 7. 持久化状态(日志已由 TruncateLogs 处理)
r.persistStateLocked()

return nil
}
Expand Down
Binary file removed Raft/raft_data/raft_log.dat
Binary file not shown.
Loading
Loading